You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/21 11:09:11 UTC

[1/6] flink git commit: [hotfix] [core] Deprecate unused and redundant config parameter 'flink.base.dir.path'

Repository: flink
Updated Branches:
  refs/heads/master 50fd1a36d -> 0c9c04d19


[hotfix] [core] Deprecate unused and redundant config parameter 'flink.base.dir.path'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8780cb6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8780cb6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8780cb6c

Branch: refs/heads/master
Commit: 8780cb6c4fa05f7cd6167f1240d03f407d8d41ea
Parents: f63426b
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 17 14:43:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 19:43:15 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/configuration/ConfigConstants.java  | 7 ++++++-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala      | 4 ----
 .../flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java  | 2 --
 .../org/apache/flink/yarn/YarnApplicationMasterRunner.java    | 2 --
 4 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8780cb6c/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 76c1cf7..44a78f9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -748,8 +748,13 @@ public final class ConfigConstants {
 	// ----------------------------- Miscellaneous ----------------------------
 	
 	/**
-	 * The key to the Flink base directory path
+	 * The key to the Flink base directory path. Was initially used for configurations of the
+	 * web UI, but outdated now.
+	 * 
+	 * @deprecated This parameter should not be used any more. A running Flink cluster should
+	 *             make no assumption about its location.
 	 */
+	@Deprecated
 	public static final String FLINK_BASE_DIR_PATH_KEY = "flink.base.dir.path";
 	
 	public static final String FLINK_JVM_OPTIONS = "env.java.opts";

http://git-wip-us.apache.org/repos/asf/flink/blob/8780cb6c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a335916..8b08181 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2401,10 +2401,6 @@ object JobManager {
       }
     }
 
-    if (new File(configDir).isDirectory) {
-      configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
-    }
-
     if (cliOptions.getWebUIPort() >= 0) {
       configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, cliOptions.getWebUIPort())
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/8780cb6c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index a8aeb07..4b24f42 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -178,8 +178,6 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 
 		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
 
-		configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
-
 		// add dynamic properties to JobManager configuration.
 		for (Map.Entry<String, String> property : additional.entrySet()) {
 			configuration.setString(property.getKey(), property.getValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/8780cb6c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 9d5673c..492fc0b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -484,8 +484,6 @@ public class YarnApplicationMasterRunner {
 
 		Configuration configuration = GlobalConfiguration.loadConfiguration(baseDirectory);
 
-		configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, baseDirectory);
-
 		// add dynamic properties to JobManager configuration.
 		for (Map.Entry<String, String> property : additional.entrySet()) {
 			configuration.setString(property.getKey(), property.getValue());


[4/6] flink git commit: [FLINK-4770] [core] Introduce 'CoreOptions'

Posted by se...@apache.org.
[FLINK-4770] [core] Introduce 'CoreOptions'

The CoreOptions should hold all essential configuration values that are not specific to
JobManager, TaskManager or any feature area, like HighAvailability or Security.

Examples for that are
  - default java options
  - default parallelism
  - default state backend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4047965
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4047965
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4047965

Branch: refs/heads/master
Commit: a4047965898adc0ba7bf74280a3b22792ced3399
Parents: 544f534
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 17 14:57:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 21:29:16 2017 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkSecuredITCase.java |  3 +-
 .../flink/configuration/ConfigConstants.java    | 11 +++++-
 .../apache/flink/configuration/CoreOptions.java | 40 ++++++++++++++++++++
 .../org/apache/flink/hdfstests/HDFSTest.java    |  4 +-
 .../clusterframework/BootstrapTools.java        |  3 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  |  4 +-
 .../clusterframework/BootstrapToolsTest.java    |  3 +-
 .../BlobLibraryCacheRecoveryITCase.java         |  4 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  3 +-
 .../streaming/runtime/tasks/StreamTask.java     |  6 +--
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  6 +--
 .../test/checkpointing/RescalingITCase.java     |  3 +-
 .../test/checkpointing/SavepointITCase.java     |  5 ++-
 .../utils/SavepointMigrationTestBase.java       |  3 +-
 .../test/classloading/ClassLoaderITCase.java    |  3 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  3 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |  4 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |  3 +-
 19 files changed, 86 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
index fa46fc7..768ca5e 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.fs;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -216,7 +217,7 @@ public class RollingSinkSecuredITCase extends RollingSinkITCase {
 			config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
 			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
-			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 			config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
 			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
 			config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 44a78f9..5129f20 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -741,8 +741,11 @@ public final class ConfigConstants {
 	// ----------------------------- Streaming --------------------------------
 	
 	/**
-	 * State backend for checkpoints;
+	 * State backend for checkpoints
+	 * 
+	 * @deprecated Use {@link CoreOptions#STATE_BACKEND} instead.
 	 */
+	@Deprecated
 	public static final String STATE_BACKEND = "state.backend";
 	
 	// ----------------------------- Miscellaneous ----------------------------
@@ -756,7 +759,11 @@ public final class ConfigConstants {
 	 */
 	@Deprecated
 	public static final String FLINK_BASE_DIR_PATH_KEY = "flink.base.dir.path";
-	
+
+	/**
+	 * @deprecated Use {@link CoreOptions#FLINK_JVM_OPTIONS} instead.
+	 */
+	@Deprecated
 	public static final String FLINK_JVM_OPTIONS = "env.java.opts";
 
 	// --------------------------- High Availability --------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
new file mode 100644
index 0000000..70e5f0b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+@PublicEvolving
+public class CoreOptions {
+
+	/**
+	 * 
+	 */
+	public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
+		.key("env.java.opts")
+		.defaultValue("");
+
+	public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
+		.key("parallelism.default")
+		.defaultValue(-1);
+	
+	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
+		.key("state.backend")
+		.noDefaultValue();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 49db0f8..75e666f 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -230,7 +230,7 @@ public class HDFSTest {
 		org.apache.flink.configuration.Configuration
 			config = new org.apache.flink.configuration.Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
+		config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
 
 		BlobRecoveryITCase.testBlobServerRecovery(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index a0cf1d5..ebc9af8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -27,6 +27,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -358,7 +359,7 @@ public class BootstrapTools {
 			.put("jvmmem", 	"-Xms" + tmParams.taskManagerHeapSizeMB() + "m " +
 							"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
 							"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
-		String javaOpts = flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
 		//applicable only for YarnMiniCluster secure test run
 		//krb5.conf file will be available as local resource in JM/TM container
 		if(hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index d043665..a7f792f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
 	public void testBlobServerRecovery() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());
 
 		testBlobServerRecovery(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index b08e1f4..1d100da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -209,7 +210,7 @@ public class BootstrapToolsTest {
 					true, true, true, this.getClass()));
 
 		// logback + log4j, with/out krb5, different JVM opts
-		cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+		cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
 		assertEquals(
 			java + " " + jvmmem +
 				" " + jvmOpts +

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index d3925be..7f75acc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -65,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
 
 		Configuration config = new Configuration();
 		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index 07cec32..42338cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
@@ -80,7 +81,7 @@ public class ZooKeeperTestUtils {
 		config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);
 
 		// File system state backend
-		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
 		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6b33d12..d734dc9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -694,7 +694,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		} else {
 			// see if we have a backend specified in the configuration
 			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
-			String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
+			String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null);
 
 			if (backendName == null) {
 				LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
@@ -731,7 +731,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 						throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
 					} catch (ClassCastException e) {
 						throw new IllegalConfigurationException("The class configured under '" +
-								ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+								CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
 								backendName + ')');
 					} catch (Throwable t) {
 						throw new IllegalConfigurationException("Cannot create configured state backend", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 508d2e1..6f0e881 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -1074,7 +1074,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	private static StreamTask<?, ?> createMockTask() {
 		Configuration configuration = new Configuration();
-		configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
+		configuration.setString(CoreOptions.STATE_BACKEND, "jobmanager");
 
 		StreamTask<?, ?> task = mock(StreamTask.class);
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a63ee8..887ea4f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -192,7 +192,7 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testStateBackendLoadingAndClosing() throws Exception {
 		Configuration taskManagerConfig = new Configuration();
-		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+		taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
@@ -216,7 +216,7 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testStateBackendClosingOnFailure() throws Exception {
 		Configuration taskManagerConfig = new Configuration();
-		taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
+		taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 073632a..875d0ed 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -105,7 +106,7 @@ public class RescalingITCase extends TestLogger {
 		final File checkpointDir = temporaryFolder.newFolder();
 		final File savepointDir = temporaryFolder.newFolder();
 
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
 		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 77777d1..128522b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -168,7 +169,7 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
 			LOG.info("Created temporary savepoint directory: " + savepointDir + ".");
 
-			config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+			config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 			config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				checkpointDir.toURI().toString());
 			config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
@@ -701,7 +702,7 @@ public class SavepointITCase extends TestLogger {
 			fail("Test setup failed: failed to create temporary directories.");
 		}
 
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				checkpointDir.toURI().toString());
 		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 1a8a0a0..fced68c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -90,7 +91,7 @@ public class SavepointMigrationTestBase extends TestBaseUtils {
 		LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
 		LOG.info("Created savepoint directory: " + savepointDir + ".");
 
-		config.setString(ConfigConstants.STATE_BACKEND, "memory");
+		config.setString(CoreOptions.STATE_BACKEND, "memory");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
 		config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
 		config.setString("state.savepoints.dir", savepointDir.toURI().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index ca69e80..f25a302 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -97,7 +98,7 @@ public class ClassLoaderITCase extends TestLogger {
 		parallelism = 4;
 
 		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
-		config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
+		config.setString(CoreOptions.STATE_BACKEND, "filesystem");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index d959e14..546e3d7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -118,7 +119,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
-			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+			"@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
 			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 21599c1..edf57b3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
@@ -1232,8 +1233,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// ------------------ Prepare Application Master Container  ------------------------------
 
 		// respect custom JVM options in the YAML file
-		String javaOpts =
-			flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
 		//applicable only for YarnMiniCluster secure test run
 		//krb5.conf file will be available as local resource in JM/TM container
 		if (hasKrb5) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a4047965/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 70ccae8..ad3ebcd 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.fs.Path;
@@ -203,7 +204,7 @@ public class YarnClusterDescriptorTest {
 				.getCommands().get(0));
 
 		// logback + log4j, with/out krb5, different JVM opts
-		cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+		cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
 		assertEquals(
 			java + " " + jvmmem +
 				" " + jvmOpts +


[2/6] flink git commit: [hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointCoordinator

Posted by se...@apache.org.
[hotfix] [jobmanager] Minor code cleanups in JobGraph and CheckpointCoordinator

This makes the exception that can occur during serialization of the ExecutionConfig explicit,
and adds some comments to JobGraph.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f63426b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f63426b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f63426b0

Branch: refs/heads/master
Commit: f63426b0322e05fd0986ae5f224a69b1320724f6
Parents: 50fd1a3
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 16 18:34:51 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 19:43:15 2017 +0100

----------------------------------------------------------------------
 .../plantranslate/JobGraphGenerator.java        |  9 ++-
 .../checkpoint/CheckpointCoordinator.java       |  2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java | 84 +++++++++++---------
 .../LeaderChangeJobRecoveryTest.java            |  8 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  4 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 10 ++-
 6 files changed, 69 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 6f7b04a..caeb43f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -83,6 +83,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -223,7 +224,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 
 		// create the job graph object
 		JobGraph graph = new JobGraph(jobId, program.getJobName());
-		graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+		try {
+			graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
+		}
+		catch (IOException e) {
+			throw new CompilerException("Could not serialize the ExecutionConfig." +
+					"This indicates that non-serializable types (like custom serializers) were registered");
+		}
 
 		graph.setAllowQueuedScheduling(false);
 		graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 78cad91..6cac006 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -623,7 +623,7 @@ public class CheckpointCoordinator {
 	 * @return Flag indicating whether the ack'd checkpoint was associated
 	 * with a pending checkpoint.
 	 *
-	 * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
+	 * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store.
 	 */
 	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6db9277..f6377e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -53,18 +53,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
  * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
- * but inside certain special vertices that establish the feedback channel amongst themselves.</p>
+ * but inside certain special vertices that establish the feedback channel amongst themselves.
  *
  * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate result
- * define the characteristics of the concrete operation and intermediate data.</p>
+ * define the characteristics of the concrete operation and intermediate data.
  */
 public class JobGraph implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	// --------------------------------------------------------------------------------------------
-	// Members that define the structure / topology of the graph
-	// --------------------------------------------------------------------------------------------
+	// --- job and configuration ---
 
 	/** List of task vertices included in this job graph. */
 	private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
@@ -72,12 +70,6 @@ public class JobGraph implements Serializable {
 	/** The job configuration attached to this job. */
 	private final Configuration jobConfiguration = new Configuration();
 
-	/** Set of JAR files required to run this job. */
-	private final List<Path> userJars = new ArrayList<Path>();
-
-	/** Set of blob keys identifying the JAR files required to run this job. */
-	private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
-
 	/** ID of this job. May be set if specific job id is desired (e.g. session management) */
 	private final JobID jobID;
 
@@ -94,18 +86,28 @@ public class JobGraph implements Serializable {
 	/** The mode in which the job is scheduled */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-	/** The settings for asynchronous snapshots */
-	private JobSnapshottingSettings snapshotSettings;
-
-	/** List of classpaths required to run this job. */
-	private List<URL> classpaths = Collections.emptyList();
+	// --- checkpointing ---
 
 	/** Job specific execution config */
 	private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
+	/** The settings for the job checkpoints */
+	private JobSnapshottingSettings snapshotSettings;
+
 	/** Savepoint restore settings. */
 	private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
 
+	// --- attached resources ---
+
+	/** Set of JAR files required to run this job. */
+	private final List<Path> userJars = new ArrayList<Path>();
+
+	/** Set of blob keys identifying the JAR files required to run this job. */
+	private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
+
+	/** List of classpaths required to run this job. */
+	private List<URL> classpaths = Collections.emptyList();
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -129,7 +131,13 @@ public class JobGraph implements Serializable {
 	public JobGraph(JobID jobId, String jobName) {
 		this.jobID = jobId == null ? new JobID() : jobId;
 		this.jobName = jobName == null ? "(unnamed job)" : jobName;
-		setExecutionConfig(new ExecutionConfig());
+
+		try {
+			setExecutionConfig(new ExecutionConfig());
+		} catch (IOException e) {
+			// this should never happen, since an empty execution config is always serializable
+			throw new RuntimeException("bug, empty execution config is not serializable");
+		}
 	}
 
 	/**
@@ -260,17 +268,16 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig
-	 * object will not affect this serialized copy.
+	 * Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC
+	 * transport. Further modification of the referenced ExecutionConfig object will not affect
+	 * this serialized copy.
+	 * 
 	 * @param executionConfig The ExecutionConfig to be serialized.
+	 * @throws IOException Thrown if the serialization of the ExecutionConfig fails
 	 */
-	public void setExecutionConfig(ExecutionConfig executionConfig) {
+	public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException {
 		checkNotNull(executionConfig, "ExecutionConfig must not be null.");
-		try {
-			this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
-		} catch (IOException e) {
-			throw new RuntimeException("Could not serialize ExecutionConfig.", e);
-		}
+		this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
 	}
 
 	/**
@@ -362,6 +369,21 @@ public class JobGraph implements Serializable {
 		return classpaths;
 	}
 
+	/**
+	 * Gets the maximum parallelism of all operations in this job graph.
+	 *
+	 * @return The maximum parallelism of this job graph
+	 */
+	public int getMaximumParallelism() {
+		int maxParallelism = -1;
+		for (JobVertex vertex : taskVertices.values()) {
+			maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
+		}
+		return maxParallelism;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Topological Graph Access
 	// --------------------------------------------------------------------------------------------
 
 	public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
@@ -539,18 +561,6 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Gets the maximum parallelism of all operations in this job graph.
-	 * @return The maximum parallelism of this job graph
-	 */
-	public int getMaximumParallelism() {
-		int maxParallelism = -1;
-		for (JobVertex vertex : taskVertices.values()) {
-			maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
-		}
-		return maxParallelism;
-	}
-
-	/**
 	 * Uploads the previously added user JAR files to the job manager through
 	 * the job manager's BLOB server. The respective port is retrieved from the
 	 * JobManager. This function issues a blocking call.

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index be26e7b..fe33022 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -136,11 +135,6 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 		sender.setSlotSharingGroup(slotSharingGroup);
 		receiver.setSlotSharingGroup(slotSharingGroup);
 
-		ExecutionConfig executionConfig = new ExecutionConfig();
-
-		JobGraph jobGraph = new JobGraph("Blocking test job", sender, receiver);
-		jobGraph.setExecutionConfig(executionConfig);
-
-		return jobGraph;
+		return new JobGraph("Blocking test job", sender, receiver);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f656622..f90367c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+
 /**
  * Integration test cases for the {@link MiniCluster}.
  */
@@ -95,7 +97,7 @@ public class MiniClusterITCase extends TestLogger {
 		miniCluster.runJobBlocking(job);
 	}
 
-	private static JobGraph getSimpleJob() {
+	private static JobGraph getSimpleJob() throws IOException {
 		JobVertex task = new JobVertex("Test task");
 		task.setParallelism(1);
 		task.setMaxParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/f63426b0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8877c80..a4bb165 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -50,6 +51,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -129,7 +131,13 @@ public class StreamingJobGraphGenerator {
 		configureCheckpointing();
 
 		// set the ExecutionConfig last when it has been finalized
-		jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+		try {
+			jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+		}
+		catch (IOException e) {
+			throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
+					"This indicates that non-serializable types (like custom serializers) were registered");
+		}
 
 		return jobGraph;
 	}


[3/6] flink git commit: [FLINK-5074] [runtime] Add a ZooKeeper-based RunningJobRegistry

Posted by se...@apache.org.
[FLINK-5074] [runtime] Add a ZooKeeper-based RunningJobRegistry

This closes #2903


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544f5346
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544f5346
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544f5346

Branch: refs/heads/master
Commit: 544f53467b901e6e891a23fc4f2ef3a6be229718
Parents: 8780cb6
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Mon Feb 13 18:33:14 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 19:43:47 2017 +0100

----------------------------------------------------------------------
 .../HighAvailabilityServicesUtils.java          | 10 ++-
 .../highavailability/ZookeeperHaServices.java   |  6 +-
 .../highavailability/ZookeeperRegistry.java     | 94 ++++++++++++++++++++
 .../highavailability/ZooKeeperRegistryTest.java | 78 ++++++++++++++++
 4 files changed, 183 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 9113309..fe180de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 
 public class HighAvailabilityServicesUtils {
 
@@ -32,8 +34,8 @@ public class HighAvailabilityServicesUtils {
 				return new EmbeddedNonHaServices();
 
 			case ZOOKEEPER:
-				throw new UnsupportedOperationException("ZooKeeper high availability services " +
-						"have not been implemented yet.");
+				return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(config), 
+						Executors.directExecutor(), config);
 
 			default:
 				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
@@ -49,8 +51,8 @@ public class HighAvailabilityServicesUtils {
 				final String resourceManagerAddress = null;
 				return new NonHaServices(resourceManagerAddress);
 			case ZOOKEEPER:
-				throw new UnsupportedOperationException("ZooKeeper high availability services " +
-					"have not been implemented yet.");
+				return new ZookeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), 
+						Executors.directExecutor(), configuration);
 			default:
 				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index ed0ad17..741f9e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -97,10 +97,14 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	/** The runtime configuration */
 	private final Configuration configuration;
 
+	/** The zookeeper based running jobs registry */
+	private final RunningJobsRegistry runningJobsRegistry;
+
 	public ZookeeperHaServices(CuratorFramework client, Executor executor, Configuration configuration) {
 		this.client = checkNotNull(client);
 		this.executor = checkNotNull(executor);
 		this.configuration = checkNotNull(configuration);
+		this.runningJobsRegistry = new ZookeeperRegistry(client, configuration);
 	}
 
 	// ------------------------------------------------------------------------
@@ -149,7 +153,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 
 	@Override
 	public RunningJobsRegistry getRunningJobsRegistry() {
-		throw new UnsupportedOperationException("not yet implemented");
+		return runningJobsRegistry;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
new file mode 100644
index 0000000..c0621af
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperRegistry.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A zookeeper based registry for running jobs, highly available.
+ */
+public class ZookeeperRegistry implements RunningJobsRegistry {
+	
+	private static final String DEFAULT_HA_JOB_REGISTRY_PATH = "/running_job_registry/";
+
+	/** The ZooKeeper client to use */
+	private final CuratorFramework client;
+
+	private final String runningJobPath;
+
+	private static final String HA_JOB_REGISTRY_PATH = "high-availability.zookeeper.job.registry";
+
+	public ZookeeperRegistry(final CuratorFramework client, final Configuration configuration) {
+		this.client = client;
+		runningJobPath = configuration.getValue(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT) + 
+			configuration.getString(HA_JOB_REGISTRY_PATH, DEFAULT_HA_JOB_REGISTRY_PATH);
+	}
+
+	@Override
+	public void setJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			String zkPath = runningJobPath + jobID.toString();
+			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+			this.client.setData().forPath(zkPath);
+		}
+		catch (Exception e) {
+			throw new IOException("Set running state to zk fail for job " + jobID.toString(), e);
+		}
+	}
+
+	@Override
+	public void setJobFinished(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			String zkPath = runningJobPath + jobID.toString();
+			this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
+			this.client.delete().forPath(zkPath);
+		}
+		catch (Exception e) {
+			throw new IOException("Set finished state to zk fail for job " + jobID.toString(), e);
+		}
+	}
+
+	@Override
+	public boolean isJobRunning(JobID jobID) throws IOException {
+		checkNotNull(jobID);
+
+		try {
+			Stat stat = client.checkExists().forPath(runningJobPath + jobID.toString());
+			if (stat != null) {
+				return true;
+			}
+			return false;
+		}
+		catch (Exception e) {
+			throw new IOException("Get running state from zk fail for job " + jobID.toString(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/544f5346/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
new file mode 100644
index 0000000..72982c8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/ZooKeeperRegistryTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+public class ZooKeeperRegistryTest extends TestLogger {
+	private TestingServer testingServer;
+
+	private static Logger LOG = LoggerFactory.getLogger(ZooKeeperRegistryTest.class);
+
+	@Before
+	public void before() throws Exception {
+		testingServer = new TestingServer();
+	}
+
+	@After
+	public void after() throws Exception {
+		testingServer.stop();
+		testingServer = null;
+	}
+
+	/**
+	 * Tests that the function of ZookeeperRegistry, setJobRunning(), setJobFinished(), isJobRunning()
+	 */
+	@Test
+	public void testZooKeeperRegistry() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
+		configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+		HighAvailabilityServices zkHaService = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+		RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
+
+		try {
+			JobID jobID = JobID.generate();
+			assertTrue(!zkRegistry.isJobRunning(jobID));
+
+			zkRegistry.setJobRunning(jobID);
+			assertTrue(zkRegistry.isJobRunning(jobID));
+
+			zkRegistry.setJobFinished(jobID);
+			assertTrue(!zkRegistry.isJobRunning(jobID));
+
+		} finally {
+			if (zkHaService != null) {
+				zkHaService.close();
+			}
+		}
+	}
+}


[5/6] flink git commit: [FLINK-5821] [state backends] Rename the 'StateBackend' to 'StateBinder' and create root StateBackend interface

Posted by se...@apache.org.
[FLINK-5821] [state backends] Rename the 'StateBackend' to 'StateBinder' and create root StateBackend interface

StateBinder more correctly reflects what the interface does and clears up the name 'StateBackend'

The 'StateBackend' interface is now the root of the State Backend hierarchy (previously that was 'AbstractStateBackend')

This also extends a lot the JavaDocs of the core state classes, like StateBackend and StateObject


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f15603d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f15603d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f15603d8

Branch: refs/heads/master
Commit: f15603d81dad4861175093f4ad22eb2f8ccee4a0
Parents: a404796
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 13 14:29:37 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 21 01:16:26 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |   3 +-
 .../state/AggregatingStateDescriptor.java       |   4 +-
 .../common/state/FoldingStateDescriptor.java    |   4 +-
 .../api/common/state/ListStateDescriptor.java   |   4 +-
 .../common/state/ReducingStateDescriptor.java   |   4 +-
 .../flink/api/common/state/StateBackend.java    |  73 ----------
 .../flink/api/common/state/StateBinder.java     |  73 ++++++++++
 .../flink/api/common/state/StateDescriptor.java |   8 +-
 .../api/common/state/ValueStateDescriptor.java  |   4 +-
 .../api/common/state/ListStateDescriptor.java   |   4 +-
 .../state/AbstractKeyedStateBackend.java        |   4 +-
 .../runtime/state/AbstractStateBackend.java     |  31 ++--
 .../flink/runtime/state/StateBackend.java       | 145 +++++++++++++++++++
 .../apache/flink/runtime/state/StateObject.java |  41 ++++--
 14 files changed, 276 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 06cceda..6b09a8a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -19,7 +19,6 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.StateBackend;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
@@ -50,7 +49,7 @@ import java.util.UUID;
 import static java.util.Objects.requireNonNull;
 
 /**
- * A {@link StateBackend} that stores its state in {@code RocksDB}. This state backend can
+ * A State Backend that stores its state in {@code RocksDB}. This state backend can
  * store very large state that exceeds memory and spills to disk.
  *
  * <p>All key/value state (including windows) is stored in the key/value index of RocksDB.

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index abdac91..b7378d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -96,8 +96,8 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
 	// ------------------------------------------------------------------------
 
 	@Override
-	public AggregatingState<IN, OUT> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createAggregatingState(this);
+	public AggregatingState<IN, OUT> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createAggregatingState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 143945e..73bfaa8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -97,8 +97,8 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public FoldingState<T, ACC> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createFoldingState(this);
+	public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createFoldingState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index c03f8cb..ea28ad2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -79,8 +79,8 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	// ------------------------------------------------------------------------
 
 	@Override
-	public ListState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createListState(this);
+	public ListState<T> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createListState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index a1d4225..3edf1ca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -86,8 +86,8 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public ReducingState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createReducingState(this);
+	public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createReducingState(this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
deleted file mode 100644
index f9d1af7..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBackend.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-/**
- * The {@code StateBackend} is used by {@link StateDescriptor} instances to create actual state
- * representations.
- */
-@PublicEvolving
-public interface StateBackend {
-
-	/**
-	 * Creates and returns a new {@link ValueState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the value that the {@code ValueState} can store.
-	 */
-	<T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ListState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ListState} can store.
-	 */
-	<T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link ReducingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> The type of the values that the {@code ReducingState} can store.
-	 */
-	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link AggregatingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <IN> The type of the values that go into the aggregating state
-	 * @param <ACC> The type of the values that are stored in the aggregating state   
-	 * @param <OUT> The type of the values that come out of the aggregating state   
-	 */
-	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
-			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
-
-	/**
-	 * Creates and returns a new {@link FoldingState}.
-	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
-	 *
-	 * @param <T> Type of the values folded into the state
-	 * @param <ACC> Type of the value in the state
-	 */
-	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
new file mode 100644
index 0000000..08dfc90
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * The {@code StateBinder} is used by {@link StateDescriptor} instances to create actual
+ * {@link State} objects.
+ */
+@Internal
+public interface StateBinder {
+
+	/**
+	 * Creates and returns a new {@link ValueState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> The type of the value that the {@code ValueState} can store.
+	 */
+	<T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ListState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> The type of the values that the {@code ListState} can store.
+	 */
+	<T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link ReducingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> The type of the values that the {@code ReducingState} can store.
+	 */
+	<T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link AggregatingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <IN> The type of the values that go into the aggregating state
+	 * @param <ACC> The type of the values that are stored in the aggregating state   
+	 * @param <OUT> The type of the values that come out of the aggregating state   
+	 */
+	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
+			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;
+
+	/**
+	 * Creates and returns a new {@link FoldingState}.
+	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+	 *
+	 * @param <T> Type of the values folded into the state
+	 * @param <ACC> Type of the value in the state
+	 */
+	<T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index bc909e6..332e649 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -39,7 +39,7 @@ import static java.util.Objects.requireNonNull;
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
  * {@link State} in stateful operations. This contains the name and can create an actual state
- * object given a {@link StateBackend} using {@link #bind(StateBackend)}.
+ * object given a {@link StateBinder} using {@link #bind(StateBinder)}.
  *
  * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
  *
@@ -208,11 +208,11 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	}
 
 	/**
-	 * Creates a new {@link State} on the given {@link StateBackend}.
+	 * Creates a new {@link State} on the given {@link StateBinder}.
 	 *
-	 * @param stateBackend The {@code StateBackend} on which to create the {@link State}.
+	 * @param stateBinder The {@code StateBackend} on which to create the {@link State}.
 	 */
-	public abstract S bind(StateBackend stateBackend) throws Exception;
+	public abstract S bind(StateBinder stateBinder) throws Exception;
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index b3006c4..3afc8a7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -124,8 +124,8 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public ValueState<T> bind(StateBackend stateBackend) throws Exception {
-		return stateBackend.createValueState(this);
+	public ValueState<T> bind(StateBinder stateBinder) throws Exception {
+		return stateBinder.createValueState(this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
index 28bc812..3b1af54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java
@@ -20,7 +20,7 @@ package org.apache.flink.migration.api.common.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -70,7 +70,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public ListState<T> bind(StateBackend stateBackend) throws Exception {
+	public ListState<T> bind(StateBinder stateBinder) throws Exception {
 		throw new IllegalStateException("Cannot bind states with a legacy state descriptor.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index c8e0d0d..fe5d1cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateBinder;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
@@ -264,7 +264,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		}
 
 		// create a new blank key/value state
-		S state = stateDescriptor.bind(new StateBackend() {
+		S state = stateDescriptor.bind(new StateBinder() {
 			@Override
 			public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
 				return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 60d035a..bc4594a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
@@ -26,27 +27,18 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import java.io.IOException;
 
 /**
- * A state backend defines how state is stored and snapshotted during checkpoints.
+ * An abstract base implementation of the {@link StateBackend} interface.
  */
-public abstract class AbstractStateBackend implements java.io.Serializable {
+@PublicEvolving
+public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
 	private static final long serialVersionUID = 4620415814639230247L;
 
-	/**
-	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
-	 * that should end up in a checkpoint.
-	 *
-	 * @param jobId              The {@link JobID} of the job for which we are creating checkpoint streams.
-	 * @param operatorIdentifier An identifier of the operator for which we create streams.
-	 */
+	@Override
 	public abstract CheckpointStreamFactory createStreamFactory(
 			JobID jobId,
-			String operatorIdentifier
-	) throws IOException;
+			String operatorIdentifier) throws IOException;
 
-	/**
-	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for keeping keyed state
-	 * and can be checkpointed to checkpoint streams.
-	 */
+	@Override
 	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,
@@ -54,16 +46,13 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			TaskKvStateRegistry kvStateRegistry
-	) throws Exception;
+			TaskKvStateRegistry kvStateRegistry) throws Exception;
 
-	/**
-	 * Creates a new {@link OperatorStateBackend} that can be used for storing partitionable operator
-	 * state in checkpoint streams.
-	 */
+	@Override
 	public OperatorStateBackend createOperatorStateBackend(
 			Environment env,
 			String operatorIdentifier) throws Exception {
+
 		return new DefaultOperatorStateBackend(env.getUserClassLoader());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
new file mode 100644
index 0000000..846df89
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+
+import java.io.IOException;
+
+/**
+ * A <b>State Backend</b> defines how the state of a streaming application is stored and
+ * checkpointed. Different State Backends store their state in different fashions, and use
+ * different data structures to hold the state of a running application.
+ *
+ * <p>For example, the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend memory state backend}
+ * keeps working state in the memory of the TaskManager and stores checkpoints in the memory of the
+ * JobManager. The backend is lightweight and without additional dependencies, but not highly available
+ * and supports only small state.
+ *
+ * <p>The {@link org.apache.flink.runtime.state.filesystem.FsStateBackend file system state backend}
+ * keeps working state in the memory of the TaskManager and stores state checkpoints in a filesystem
+ * (typically a replicated highly-available filesystem, like <a href="https://hadoop.apache.org/">HDFS</a>,
+ * <a href="https://ceph.com/">Ceph</a>, <a href="https://aws.amazon.com/documentation/s3/">S3</a>,
+ * <a href="https://cloud.google.com/storage/">GCS</a>, etc).
+ * 
+ * <p>The {@code RocksDBStateBackend} stores working state in <a href="http://rocksdb.org/">RocksDB</a>,
+ * and checkpoints the state by default to a filesystem (similar to the {@code FsStateBackend}).
+ * 
+ * <h2>Raw Bytes Storage and Backends</h2>
+ * 
+ * The {@code StateBackend} creates services for <i>raw bytes storage</i> and for <i>keyed state</i>
+ * and <i>operator state</i>.
+ * 
+ * <p>The <i>raw bytes storage</i> (through the {@link CheckpointStreamFactory}) is the fundamental
+ * service that simply stores bytes in a fault tolerant fashion. This service is used by the JobManager
+ * to store checkpoint and recovery metadata and is typically also used by the keyed- and operator state
+ * backends to store checkpointed state.
+ *
+ * <p>The {@link AbstractKeyedStateBackend} and {@link OperatorStateBackend} created by this state
+ * backend define how to hold the working state for keys and operators. They also define how to checkpoint
+ * that state, frequently using the raw bytes storage (via the {@code CheckpointStreamFactory}).
+ * However, it is also possible that for example a keyed state backend simply implements the bridge to
+ * a key/value store, and that it does not need to store anything in the raw byte storage upon a
+ * checkpoint.
+ * 
+ * <h2>Serializability</h2>
+ * 
+ * State Backends need to be {@link java.io.Serializable serializable}, because they distributed
+ * across parallel processes (for distributed execution) together with the streaming application code. 
+ * 
+ * <p>Because of that, {@code StateBackend} implementations (typically subclasses
+ * of {@link AbstractStateBackend}) are meant to be like <i>factories</i> that create the proper
+ * states stores that provide access to the persistent storage and hold the keyed- and operator
+ * state data structures. That way, the State Backend can be very lightweight (contain only
+ * configurations) which makes it easier to be serializable.
+ * 
+ * 
+ * <h2>Thread Safety</h2>
+ * 
+ * State backend implementations have to be thread-safe. Multiple threads may be creating
+ * streams and keyed-/operator state backends concurrently.
+ */
+@PublicEvolving
+public interface StateBackend extends java.io.Serializable {
+
+	// ------------------------------------------------------------------------
+	//  Persistent Bytes Storage
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+	 * that should end up in a checkpoint.
+	 *
+	 * @param jobId              The {@link JobID} of the job for which we are creating checkpoint streams.
+	 * @param operatorIdentifier An identifier of the operator for which we create streams.
+	 */
+	CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
+
+	// ------------------------------------------------------------------------
+	//  Structure Backends 
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
+	 * and checkpointing it.
+	 * 
+	 * <p><i>Keyed State</i> is state where each value is bound to a key.
+	 * 
+	 * @param env
+	 * @param jobID
+	 * @param operatorIdentifier
+	 * @param keySerializer
+	 * @param numberOfKeyGroups
+	 * @param keyGroupRange
+	 * @param kvStateRegistry
+	 * 
+	 * @param <K> The type of the keys by which the state is organized.
+	 *     
+	 * @return The Keyed State Backend for the given job, operator, and key group range.
+	 * 
+	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
+	 */
+	<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+			Environment env,
+			JobID jobID,
+			String operatorIdentifier,
+			TypeSerializer<K> keySerializer,
+			int numberOfKeyGroups,
+			KeyGroupRange keyGroupRange,
+			TaskKvStateRegistry kvStateRegistry) throws Exception;
+
+	/**
+	 * Creates a new {@link OperatorStateBackend} that can be used for storing operator state.
+	 * 
+	 * <p>Operator state is state that is associated with parallel operator (or function) instances,
+	 * rather than with keys.
+	 * 
+	 * @param env The runtime environment of the executing task.
+	 * @param operatorIdentifier The identifier of the operator whose state should be stored.
+	 * 
+	 * @return The OperatorStateBackend for operator identified by the job and operator identifier.
+	 * 
+	 * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
+	 */
+	OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f15603d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
index 9ff2fa8..7f1dd18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
@@ -19,28 +19,45 @@
 package org.apache.flink.runtime.state;
 
 /**
- * Base of all types that represent checkpointed state. Specializations are for
- * example {@link StateHandle StateHandles} (directly resolve to state).
+ * Base of all handles that represent checkpointed state in some form. The object may hold
+ * the (small) state directly, or contain a file path (state is in the file), or contain the
+ * metadata to access the state stored in some external database.
  *
- * <p>State objects define how to:
- * <ul>
- *     <li><b>Discard State</b>: The {@link #discardState()} method defines how state is permanently
- *         disposed/deleted. After that method call, state may not be recoverable any more.</li>
- * </ul>
+ * <p>State objects define how to {@link #discardState() discard state} and how to access the
+ * {@link #getStateSize() size of the state}.
+ * 
+ * <p>State Objects are transported via RPC between <i>JobManager</i> and
+ * <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to support that.
+ * 
+ * <p>Some State Objects are stored in the checkpoint/savepoint metadata. For long-term
+ * compatibility, they are not stored via {@link java.io.Serializable Java Serialization},
+ * but through custom serializers.
  */
 public interface StateObject extends java.io.Serializable {
 
 	/**
 	 * Discards the state referred to by this handle, to free up resources in
-	 * the persistent storage. This method is called when the handle will not be
-	 * used any more.
+	 * the persistent storage. This method is called when the state represented by this
+	 * object will not be used any more.
 	 */
 	void discardState() throws Exception;
 
 	/**
-	 * Returns the size of the state in bytes.
-	 *
-	 * <p>If the the size is not known, return {@code 0}.
+	 * Returns the size of the state in bytes. If the the size is not known, this
+	 * method should return {@code 0}.
+	 * 
+	 * <p>The values produced by this method are only used for informational purposes and
+	 * for metrics/monitoring. If this method returns wrong values, the checkpoints and recovery
+	 * will still behave correctly. However, efficiency may be impacted (wrong space pre-allocation)
+	 * and functionality that depends on metrics (like monitoring) will be impacted.
+	 * 
+	 * <p>Note for implementors: This method should not perform any I/O operations
+	 * while obtaining the state size (hence it does not declare throwing an {@code IOException}).
+	 * Instead, the state size should be stored in the state object, or should be computable from
+	 * the state stored in this object.
+	 * The reason is that this method is called frequently by several parts of the checkpointing
+	 * and issuing I/O requests from this method accumulates a heavy I/O load on the storage
+	 * system at higher scale.
 	 *
 	 * @return Size of the state in bytes.
 	 */


[6/6] flink git commit: [hotfix] [tests] Fix minor JavaDoc link errors

Posted by se...@apache.org.
[hotfix] [tests] Fix minor JavaDoc link errors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9c04d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9c04d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9c04d1

Branch: refs/heads/master
Commit: 0c9c04d19472064b5406dff907671ef723cf438f
Parents: f15603d
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 20 20:52:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 21 01:16:27 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/core/testutils/OneShotLatch.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c04d1/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 1afe952..eb730fc 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -45,7 +45,7 @@ public final class OneShotLatch {
 	}
 
 	/**
-	 * Waits until {@link OneShotLatch#trigger())} is called. Once {@code trigger()} has been called this
+	 * Waits until {@link OneShotLatch#trigger()} is called. Once {@code trigger()} has been called this
 	 * call will always return immediately.
 	 * 
 	 * @throws InterruptedException Thrown if the thread is interrupted while waiting.
@@ -59,7 +59,7 @@ public final class OneShotLatch {
 	}
 
 	/**
-	 * Waits until {@link OneShotLatch#trigger())} is called. Once {@code #trigger()} has been called this
+	 * Waits until {@link OneShotLatch#trigger()} is called. Once {@code #trigger()} has been called this
 	 * call will always return immediately.
 	 * 
 	 * <p>If the latch is not triggered within the given timeout, a {@code TimeoutException}