You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:42 UTC

[1/8] flink git commit: [FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks

Repository: flink
Updated Branches:
  refs/heads/master d160b5e56 -> 24408e190


[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks

This closes #3548


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70252f34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70252f34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70252f34

Branch: refs/heads/master
Commit: 70252f3468916758e8bc456bbf482549c38ad7ff
Parents: afd36f9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 16:44:41 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 79 +++++++++++++-------
 .../runtime/checkpoint/PendingCheckpoint.java   | 38 ++++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   | 16 +++-
 .../checkpoint/PendingCheckpointTest.java       | 18 +++++
 4 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d..cc60837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +50,10 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -134,7 +136,7 @@ public class CheckpointCoordinator {
 	private final int maxConcurrentCheckpointAttempts;
 
 	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
-	private final Timer timer;
+	private final ScheduledThreadPoolExecutor timer;
 
 	/** Actor that receives status updates from the execution graph this coordinator works for */
 	private JobStatusListener jobStatusListener;
@@ -142,7 +144,8 @@ public class CheckpointCoordinator {
 	/** The number of consecutive failed trigger attempts */
 	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
 
-	private ScheduledTrigger currentPeriodicTrigger;
+	/** A handle to the current periodic trigger, to cancel it when necessary */
+	private ScheduledFuture<?> currentPeriodicTrigger;
 
 	/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
 	private long lastCheckpointCompletionNanos;
@@ -218,7 +221,13 @@ public class CheckpointCoordinator {
 
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 
-		this.timer = new Timer("Checkpoint Timer", true);
+		this.timer = new ScheduledThreadPoolExecutor(1,
+				new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
+
+		// make sure the timer internally cleans up and does not hold onto stale scheduled tasks
+		this.timer.setRemoveOnCancelPolicy(true);
+		this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+		this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 
 		if (externalizeSettings.externalizeCheckpoints()) {
 			LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
@@ -265,7 +274,7 @@ public class CheckpointCoordinator {
 				triggerRequestQueued = false;
 
 				// shut down the thread that handles the timeouts and pending triggers
-				timer.cancel();
+				timer.shutdownNow();
 
 				// clear and discard all pending checkpoints
 				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
@@ -392,7 +401,7 @@ public class CheckpointCoordinator {
 				if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 					triggerRequestQueued = true;
 					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger.cancel(false);
 						currentPeriodicTrigger = null;
 					}
 					return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -404,13 +413,14 @@ public class CheckpointCoordinator {
 
 				if (durationTillNextMillis > 0) {
 					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger.cancel(false);
 						currentPeriodicTrigger = null;
 					}
-					ScheduledTrigger trigger = new ScheduledTrigger();
 					// Reassign the new trigger to the currentPeriodicTrigger
-					currentPeriodicTrigger = trigger;
-					timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+					currentPeriodicTrigger = timer.scheduleAtFixedRate(
+							new ScheduledTrigger(),
+							durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
 					return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 				}
 			}
@@ -483,7 +493,7 @@ public class CheckpointCoordinator {
 			}
 
 			// schedule the timer that will clean up the expired checkpoints
-			TimerTask canceller = new TimerTask() {
+			final Runnable canceller = new Runnable() {
 				@Override
 				public void run() {
 					synchronized (lock) {
@@ -519,7 +529,7 @@ public class CheckpointCoordinator {
 						if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 							triggerRequestQueued = true;
 							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger.cancel(false);
 								currentPeriodicTrigger = null;
 							}
 							return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -531,14 +541,15 @@ public class CheckpointCoordinator {
 
 						if (durationTillNextMillis > 0) {
 							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger.cancel(false);
 								currentPeriodicTrigger = null;
 							}
 
-							ScheduledTrigger trigger = new ScheduledTrigger();
 							// Reassign the new trigger to the currentPeriodicTrigger
-							currentPeriodicTrigger = trigger;
-							timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+							currentPeriodicTrigger = timer.scheduleAtFixedRate(
+									new ScheduledTrigger(),
+									durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
 							return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 						}
 					}
@@ -546,7 +557,15 @@ public class CheckpointCoordinator {
 					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
 
 					pendingCheckpoints.put(checkpointID, checkpoint);
-					timer.schedule(canceller, checkpointTimeout);
+
+					ScheduledFuture<?> cancellerHandle = timer.schedule(
+							canceller,
+							checkpointTimeout, TimeUnit.MILLISECONDS);
+
+					if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+						// checkpoint is already disposed!
+						cancellerHandle.cancel(false);
+					}
 				}
 				// end of lock scope
 
@@ -866,20 +885,25 @@ public class CheckpointCoordinator {
 
 			// trigger the checkpoint from the trigger timer, to finish the work of this thread before
 			// starting with the next checkpoint
-			ScheduledTrigger trigger = new ScheduledTrigger();
 			if (periodicScheduling) {
 				if (currentPeriodicTrigger != null) {
-					currentPeriodicTrigger.cancel();
+					currentPeriodicTrigger.cancel(false);
 				}
-				currentPeriodicTrigger = trigger;
-				timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
+				currentPeriodicTrigger = timer.scheduleAtFixedRate(
+						new ScheduledTrigger(),
+						0L, baseInterval, TimeUnit.MILLISECONDS);
 			}
 			else {
-				timer.schedule(trigger, 0L);
+				timer.execute(new ScheduledTrigger());
 			}
 		}
 	}
 
+	@VisibleForTesting
+	int getNumScheduledTasks() {
+		return timer.getQueue().size();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Checkpoint State Restoring
 	// --------------------------------------------------------------------------------------------
@@ -1006,8 +1030,9 @@ public class CheckpointCoordinator {
 			stopCheckpointScheduler();
 
 			periodicScheduling = true;
-			currentPeriodicTrigger = new ScheduledTrigger();
-			timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
+			currentPeriodicTrigger = timer.scheduleAtFixedRate(
+					new ScheduledTrigger(), 
+					baseInterval, baseInterval, TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -1017,7 +1042,7 @@ public class CheckpointCoordinator {
 			periodicScheduling = false;
 
 			if (currentPeriodicTrigger != null) {
-				currentPeriodicTrigger.cancel();
+				currentPeriodicTrigger.cancel(false);
 				currentPeriodicTrigger = null;
 			}
 
@@ -1050,7 +1075,7 @@ public class CheckpointCoordinator {
 
 	// ------------------------------------------------------------------------
 
-	private class ScheduledTrigger extends TimerTask {
+	private final class ScheduledTrigger implements Runnable {
 
 		@Override
 		public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 5ca6040..b7eb037 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -46,6 +46,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -99,6 +100,8 @@ public class PendingCheckpoint {
 	@Nullable
 	private PendingCheckpointStats statsCallback;
 
+	private volatile ScheduledFuture<?> cancellerHandle;
+
 	// --------------------------------------------------------------------------------------------
 
 	public PendingCheckpoint(
@@ -197,6 +200,27 @@ public class PendingCheckpoint {
 		this.statsCallback = trackerCallback;
 	}
 
+	/**
+	 * Sets the handle for the canceller to this pending checkoint.
+	 * 
+	 * @return true, if the handle was set, false, if the checkpoint is already disposed;
+	 */
+	public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
+		synchronized (lock) {
+			if (this.cancellerHandle == null) {
+				if (!discarded) {
+					this.cancellerHandle = cancellerHandle;
+					return true;
+				} else {
+					return false;
+				}
+			}
+			else {
+				throw new IllegalStateException("A canceller handle was already set");
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Progress and Completion
 	// ------------------------------------------------------------------------
@@ -490,10 +514,24 @@ public class PendingCheckpoint {
 				discarded = true;
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();
+				cancelCanceller();
 			}
 		}
 	}
 
+	private void cancelCanceller() {
+		try {
+			final ScheduledFuture<?> canceller = this.cancellerHandle;
+			if (canceller != null) {
+				canceller.cancel(false);
+			}
+		}
+		catch (Exception e) {
+			// this code should not throw exceptions
+			LOG.warn("Error while cancelling checkpoint timeout task", e);
+		}
+	}
+
 	/**
 	 * Reports a failed checkpoint with the given optional cause.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1691370..d8bba59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest {
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
+			// we have one task scheduled that will cancel after timeout
+			assertEquals(1, coord.getNumScheduledTasks());
+
 			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
 
@@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest {
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
 			assertTrue(checkpoint.isDiscarded());
 
+			// the canceler is also removed
+			assertEquals(0, coord.getNumScheduledTasks());
+
 			// validate that we have no new pending checkpoint
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest {
 			// validate that we have a pending checkpoint
 			assertEquals(2, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, coord.getNumScheduledTasks());
 
 			Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
 			long checkpoint1Id = it.next().getKey();
@@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest {
 			}
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
-			// and trigger a new one
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, coord.getNumScheduledTasks());
 
 			// validate that it is the same second checkpoint from earlier
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
@@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest {
 			// validate that we have a pending checkpoint
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, coord.getNumScheduledTasks());
 
 			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
@@ -556,6 +566,9 @@ public class CheckpointCoordinatorTest {
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
+			// the canceler should be removed now
+			assertEquals(0, coord.getNumScheduledTasks());
+
 			// validate that the relevant tasks got a confirmation message
 			{
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
@@ -580,6 +593,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
 			assertEquals(jid, successNew.getJobId());

http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6f04f39..55b5fe0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -287,6 +288,23 @@ public class PendingCheckpointTest {
 		}
 	}
 
+	@Test
+	public void testSetCanceller() {
+		final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
+
+		PendingCheckpoint aborted = createPendingCheckpoint(props, null);
+		aborted.abortDeclined();
+		assertTrue(aborted.isDiscarded());
+		assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
+
+		PendingCheckpoint pending = createPendingCheckpoint(props, null);
+		ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
+
+		assertTrue(pending.setCancellerHandle(canceller));
+		pending.abortDeclined();
+		verify(canceller).cancel(false);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {


[8/8] flink git commit: [FLINK-4754] [checkpoints] Small followups to the configuration of number of retained checkpoints.

Posted by se...@apache.org.
[FLINK-4754] [checkpoints] Small followups to the configuration of number of retained checkpoints.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24408e19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24408e19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24408e19

Branch: refs/heads/master
Commit: 24408e19037c8761924ca66a557dfdd8236a7be4
Parents: b46f5e0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 16 11:17:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |  2 +-
 .../apache/flink/configuration/CoreOptions.java | 21 +++--
 .../executiongraph/ExecutionGraphBuilder.java   | 11 ++-
 .../ExecutionGraphDeploymentTest.java           | 80 +++++++++-----------
 4 files changed, 58 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 048e012..c835882 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,7 +182,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints).
 
-- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1)
+- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
 
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 1e40569..8cb4123 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -23,9 +23,10 @@ import org.apache.flink.annotation.PublicEvolving;
 @PublicEvolving
 public class CoreOptions {
 
-	/**
-	 * 
-	 */
+	// ------------------------------------------------------------------------
+	//  process parameters
+	// ------------------------------------------------------------------------
+
 	public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
 		.key("env.java.opts")
 		.defaultValue("");
@@ -38,16 +39,24 @@ public class CoreOptions {
 		.key("env.java.opts.taskmanager")
 		.defaultValue("");
 
+	// ------------------------------------------------------------------------
+	//  program
+	// ------------------------------------------------------------------------
+
 	public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
 		.key("parallelism.default")
 		.defaultValue(-1);
-	
+
+	// ------------------------------------------------------------------------
+	//  checkpoints / fault tolerance
+	// ------------------------------------------------------------------------
+
 	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
 		.key("state.backend")
 		.noDefaultValue();
 
 	/** The maximum number of completed checkpoint instances to retain.*/
-	public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
-		.key("state.checkpoints.max-retained-checkpoints")
+	public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions
+		.key("state.checkpoints.num-retained")
 		.defaultValue(1);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8a35773..8471178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -178,12 +178,17 @@ public class ExecutionGraphBuilder {
 			CheckpointIDCounter checkpointIdCounter;
 			try {
 				int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
-					CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+					CoreOptions.MAX_RETAINED_CHECKPOINTS);
+
 				if (maxNumberOfCheckpointsToRetain <= 0) {
 					// warning and use 1 as the default value if the setting in
 					// state.checkpoints.max-retained-checkpoints is not greater than 0.
-					log.warn("The setting for max-retained-checkpoints is not a positive number.");
-					maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+					log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
+							CoreOptions.MAX_RETAINED_CHECKPOINTS.key(),
+							maxNumberOfCheckpointsToRetain,
+							CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+
+					maxNumberOfCheckpointsToRetain = CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
 				}
 
 				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 57b549b..7f5811a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,7 +33,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -446,61 +445,50 @@ public class ExecutionGraphDeploymentTest {
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
 
+	// ------------------------------------------------------------------------
+	//  retained checkpoints config test
+	// ------------------------------------------------------------------------
+
 	@Test
-	public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
-		try {
-			final Configuration jobManagerConfig = new Configuration();
+	public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
+		final Configuration jobManagerConfig = new Configuration();
 
-			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
 
-			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
 				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test
-	public void testSettingMaxNumberOfCheckpointsToRetain() {
-		try {
-			final int maxNumberOfCheckpointsToRetain = 10;
-			final Configuration jobManagerConfig = new Configuration();
-			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
-				maxNumberOfCheckpointsToRetain);
+	public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
 
-			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+		final int maxNumberOfCheckpointsToRetain = 10;
+		final Configuration jobManagerConfig = new Configuration();
+		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+			maxNumberOfCheckpointsToRetain);
 
-			assertEquals(maxNumberOfCheckpointsToRetain,
-				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+		assertEquals(maxNumberOfCheckpointsToRetain,
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
 	@Test
-	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
-		try {
-			final int negativeMaxNumberOfCheckpointsToRetain = -10;
+	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
 
-			final Configuration jobManagerConfig = new Configuration();
-			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
-				negativeMaxNumberOfCheckpointsToRetain);
+		final int negativeMaxNumberOfCheckpointsToRetain = -10;
 
-			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+		final Configuration jobManagerConfig = new Configuration();
+		jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+			negativeMaxNumberOfCheckpointsToRetain);
 
-			assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
-				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
-				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+		assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+
+		assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
 	private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
@@ -567,14 +555,14 @@ public class ExecutionGraphDeploymentTest {
 	}
 
 	private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
-		final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+		final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
 
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test");
 		jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
-			new ArrayList<JobVertexID>(1),
-			new ArrayList<JobVertexID>(1),
-			new ArrayList<JobVertexID>(1),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
+			Collections.<JobVertexID>emptyList(),
 			100,
 			10 * 60 * 1000,
 			0,
@@ -592,7 +580,7 @@ public class ExecutionGraphDeploymentTest {
 			new ProgrammedSlotProvider(1),
 			getClass().getClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
-			Time.minutes(10),
+			Time.seconds(10),
 			new NoRestartStrategy(),
 			new UnregisteredMetricsGroup(),
 			1,


[7/8] flink git commit: [FLINK-4754] [checkpoints] Make number of retained checkpoints user configurable

Posted by se...@apache.org.
[FLINK-4754] [checkpoints] Make number of retained checkpoints user configurable

This closes #3374


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b46f5e05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b46f5e05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b46f5e05

Branch: refs/heads/master
Commit: b46f5e050bdd77fe6e501bad20466d8777218131
Parents: 6b5e1f6
Author: Tony Wei <to...@gmail.com>
Authored: Mon Feb 20 18:30:24 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../apache/flink/configuration/CoreOptions.java |   5 +
 .../checkpoint/CheckpointRecoveryFactory.java   |   3 +-
 .../checkpoint/CompletedCheckpointStore.java    |   5 +
 .../StandaloneCheckpointRecoveryFactory.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   5 +
 .../ZooKeeperCheckpointRecoveryFactory.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |   5 +
 .../executiongraph/ExecutionGraphBuilder.java   |  12 ++-
 .../CheckpointCoordinatorFailureTest.java       |   5 +
 .../ExecutionGraphDeploymentTest.java           | 101 +++++++++++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |   7 +-
 12 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 013e56a..048e012 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,6 +182,8 @@ will be used under the directory specified by jobmanager.web.tmpdir.
 
 - `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints).
 
+- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1)
+
 - `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 4e30ceb..1e40569 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -45,4 +45,9 @@ public class CoreOptions {
 	public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
 		.key("state.backend")
 		.noDefaultValue();
+
+	/** The maximum number of completed checkpoint instances to retain.*/
+	public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
+		.key("state.checkpoints.max-retained-checkpoints")
+		.defaultValue(1);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index 0c7dfa7..3fb1385 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -44,10 +44,11 @@ public interface CheckpointRecoveryFactory {
 	 * Creates a {@link CompletedCheckpointStore} instance for a job.
 	 *
 	 * @param jobId           Job ID to recover checkpoints for
+	 * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
 	 * @param userClassLoader User code class loader of the job
 	 * @return {@link CompletedCheckpointStore} instance for the job
 	 */
-	CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+	CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
 			throws Exception;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index e91e038..9c2b199 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -73,6 +73,11 @@ public interface CompletedCheckpointStore {
 	int getNumberOfRetainedCheckpoints();
 
 	/**
+	 * Returns the max number of retained checkpoints.
+	 */
+	int getMaxNumberOfRetainedCheckpoints();
+
+	/**
 	 * This method returns whether the completed checkpoint store requires checkpoints to be
 	 * externalized. Externalized checkpoints have their meta data persisted, which the checkpoint
 	 * store can exploit (for example by simply pointing the persisted metadata).

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 57785ce..2d2cc2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -37,11 +37,10 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
 	}
 
 	@Override
-	public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+	public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
 			throws Exception {
 
-		return new StandaloneCompletedCheckpointStore(
-				CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+		return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index a0248b2..6c752f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -84,6 +84,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
 	}
 
 	@Override
+	public int getMaxNumberOfRetainedCheckpoints() {
+		return maxNumberOfCheckpointsToRetain;
+	}
+
+	@Override
 	public void shutdown(JobStatus jobStatus) throws Exception {
 		try {
 			LOG.info("Shutting down");

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 09bfa8c..481559b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -59,11 +59,11 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
 	}
 
 	@Override
-	public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+	public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
 			throws Exception {
 
 		return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
-				NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, executor);
+				maxNumberOfCheckpointsToRetain, executor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 7a167cb..1319c27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -252,6 +252,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
 	}
 
 	@Override
+	public int getMaxNumberOfRetainedCheckpoints() {
+		return maxNumberOfCheckpointsToRetain;
+	}
+
+	@Override
 	public void shutdown(JobStatus jobStatus) throws Exception {
 		if (jobStatus.isGloballyTerminalState()) {
 			LOG.info("Shutting down");

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ec7103c..8a35773 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -176,7 +177,16 @@ public class ExecutionGraphBuilder {
 			CompletedCheckpointStore completedCheckpoints;
 			CheckpointIDCounter checkpointIdCounter;
 			try {
-				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, classLoader);
+				int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
+					CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+				if (maxNumberOfCheckpointsToRetain <= 0) {
+					// warning and use 1 as the default value if the setting in
+					// state.checkpoints.max-retained-checkpoints is not greater than 0.
+					log.warn("The setting for max-retained-checkpoints is not a positive number.");
+					maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+				}
+
+				completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
 				checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 9517257..340e2a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -136,6 +136,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
 		}
 
 		@Override
+		public int getMaxNumberOfRetainedCheckpoints() {
+			return 1;
+		}
+
+		@Override
 		public boolean requiresExternalizedCheckpoints() {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 30824e0..57b549b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -22,6 +22,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotEquals;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,14 +33,20 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -51,8 +58,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.operators.BatchTask;
@@ -62,6 +72,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
 public class ExecutionGraphDeploymentTest {
 
@@ -435,6 +446,63 @@ public class ExecutionGraphDeploymentTest {
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
 
+	@Test
+	public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
+		try {
+			final Configuration jobManagerConfig = new Configuration();
+
+			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSettingMaxNumberOfCheckpointsToRetain() {
+		try {
+			final int maxNumberOfCheckpointsToRetain = 10;
+			final Configuration jobManagerConfig = new Configuration();
+			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+				maxNumberOfCheckpointsToRetain);
+
+			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+			assertEquals(maxNumberOfCheckpointsToRetain,
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
+		try {
+			final int negativeMaxNumberOfCheckpointsToRetain = -10;
+
+			final Configuration jobManagerConfig = new Configuration();
+			jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+				negativeMaxNumberOfCheckpointsToRetain);
+
+			final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+			assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+			assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+				eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
 		final JobID jobId = new JobID();
 
@@ -497,4 +565,37 @@ public class ExecutionGraphDeploymentTest {
 			throw new Exception();
 		}
 	}
+
+	private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
+		final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+		final JobID jobId = new JobID();
+		final JobGraph jobGraph = new JobGraph(jobId, "test");
+		jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+			new ArrayList<JobVertexID>(1),
+			new ArrayList<JobVertexID>(1),
+			new ArrayList<JobVertexID>(1),
+			100,
+			10 * 60 * 1000,
+			0,
+			1,
+			ExternalizedCheckpointSettings.none(),
+			null,
+			false));
+
+		return ExecutionGraphBuilder.buildGraph(
+			null,
+			jobGraph,
+			configuration,
+			executor,
+			executor,
+			new ProgrammedSlotProvider(1),
+			getClass().getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.minutes(10),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			1,
+			LoggerFactory.getLogger(getClass()));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 115b06c..32358c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -485,6 +485,11 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
+		public int getMaxNumberOfRetainedCheckpoints() {
+			return 1;
+		}
+
+		@Override
 		public boolean requiresExternalizedCheckpoints() {
 			return false;
 		}
@@ -509,7 +514,7 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
-		public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception {
+		public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
 			return store;
 		}
 


[6/8] flink git commit: [FLINK-4422] [clients] Convert time interval measurements to System.nanoTime() in 'flink-clients'

Posted by se...@apache.org.
[FLINK-4422] [clients] Convert time interval measurements to System.nanoTime() in 'flink-clients'

This closes #3384


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b5e1f68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b5e1f68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b5e1f68

Branch: refs/heads/master
Commit: 6b5e1f68a9b78901f0af57f446b465a7b03a88bd
Parents: 70252f3
Author: Jin Mingjian <ji...@gmail.com>
Authored: Tue Feb 21 22:43:54 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/ClientConnectionTest.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b5e1f68/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 4eb5269..fc24a9d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -115,13 +115,13 @@ public class ClientConnectionTest {
 
 			try {
 				// wait until the caller is successful, for at most the given time
-				long now = System.currentTimeMillis();
-				long deadline = now + MAX_DELAY;
+				long now = System.nanoTime();
+				long deadline = now + MAX_DELAY * 1_000_000;
 
 				synchronized (error) {
 					while (invoker.isAlive() && error.get() == null && now < deadline) {
 						error.wait(1000);
-						now = System.currentTimeMillis();
+						now = System.nanoTime();
 					}
 				}
 


[5/8] flink git commit: [FLINK-5756] [rocksdb] Add mini benchmarks to reproduce 'merge' performance problems

Posted by se...@apache.org.
[FLINK-5756] [rocksdb] Add mini benchmarks to reproduce 'merge' performance problems


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677b508a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677b508a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677b508a

Branch: refs/heads/master
Commit: 677b508a962c5c7df9308ac3531e799cddec27f6
Parents: d160b5e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 19:13:07 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../ListViaMergeSpeedMiniBenchmark.java         | 104 ++++++++++++++++
 .../ListViaRangeSpeedMiniBenchmark.java         | 121 +++++++++++++++++++
 2 files changed, 225 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..2a530e1
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+public class ListViaMergeSpeedMiniBenchmark {
+	
+	public static void main(String[] args) throws Exception {
+		final File rocksDir = new File("/tmp/rdb");
+		FileUtils.deleteDirectory(rocksDir);
+
+		final Options options = new Options()
+				.setCompactionStyle(CompactionStyle.LEVEL)
+				.setLevelCompactionDynamicLevelBytes(true)
+				.setIncreaseParallelism(4)
+				.setUseFsync(false)
+				.setMaxOpenFiles(-1)
+				.setDisableDataSync(true)
+				.setCreateIfMissing(true)
+				.setMergeOperator(new StringAppendOperator());
+
+		final WriteOptions write_options = new WriteOptions()
+				.setSync(false)
+				.setDisableWAL(true);
+
+		final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+		final String key = "key";
+		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+		final int num = 50000;
+
+		// ----- insert -----
+		System.out.println("begin insert");
+
+		final long beginInsert = System.nanoTime();
+		for (int i = 0; i < num; i++) {
+			rocksDB.merge(write_options, keyBytes, valueBytes);
+		}
+		final long endInsert = System.nanoTime();
+		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+		// ----- read (attempt 1) -----
+
+		final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+		final long beginGet1 = System.nanoTime();
+		rocksDB.get(keyBytes, resultHolder);
+		final long endGet1 = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
+
+		// ----- read (attempt 2) -----
+
+		final long beginGet2 = System.nanoTime();
+		rocksDB.get(keyBytes, resultHolder);
+		final long endGet2 = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
+
+		// ----- compact -----
+		System.out.println("compacting...");
+		final long beginCompact = System.nanoTime();
+		rocksDB.compactRange();
+		final long endCompact = System.nanoTime();
+
+		System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
+
+		// ----- read (attempt 3) -----
+
+		final long beginGet3 = System.nanoTime();
+		rocksDB.get(keyBytes, resultHolder);
+		final long endGet3 = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..793a35b
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public class ListViaRangeSpeedMiniBenchmark {
+	
+	public static void main(String[] args) throws Exception {
+		final File rocksDir = new File("/tmp/rdb");
+		FileUtils.deleteDirectory(rocksDir);
+
+		final Options options = new Options()
+				.setCompactionStyle(CompactionStyle.LEVEL)
+				.setLevelCompactionDynamicLevelBytes(true)
+				.setIncreaseParallelism(4)
+				.setUseFsync(false)
+				.setMaxOpenFiles(-1)
+				.setDisableDataSync(true)
+				.setCreateIfMissing(true)
+				.setMergeOperator(new StringAppendOperator());
+
+		final WriteOptions write_options = new WriteOptions()
+				.setSync(false)
+				.setDisableWAL(true);
+
+		final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+		final String key = "key";
+		final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+		final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+		final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+		final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+
+		final Unsafe unsafe = MemoryUtils.UNSAFE;
+		final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+		final int num = 50000;
+		System.out.println("begin insert");
+
+		final long beginInsert = System.nanoTime();
+		for (int i = 0; i < num; i++) {
+			unsafe.putInt(keyTemplate, offset, i);
+			rocksDB.put(write_options, keyTemplate, valueBytes);
+		}
+		final long endInsert = System.nanoTime();
+		System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+		final byte[] resultHolder = new byte[num * valueBytes.length];
+
+		final long beginGet = System.nanoTime();
+
+		final RocksIterator iterator = rocksDB.newIterator();
+		int pos = 0;
+
+		// seek to start
+		unsafe.putInt(keyTemplate, offset, 0);
+		iterator.seek(keyTemplate);
+
+		// mark end
+		unsafe.putInt(keyTemplate, offset, -1);
+
+		// iterate
+		while (iterator.isValid()) {
+			byte[] currKey = iterator.key();
+			if (samePrefix(keyBytes, currKey)) {
+				byte[] currValue = iterator.value();
+				System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+				pos += currValue.length;
+				iterator.next();
+			}
+			else {
+				break;
+			}
+		}
+
+		final long endGet = System.nanoTime();
+
+		System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
+	}
+
+	private static boolean samePrefix(byte[] prefix, byte[] key) {
+		for (int i = 0; i < prefix.length; i++) {
+			if (prefix[i] != key [i]) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+}


[3/8] flink git commit: [FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers

Posted by se...@apache.org.
[FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers

This closes #3373


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f99aae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f99aae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f99aae1

Branch: refs/heads/master
Commit: 0f99aae1e1f8b693c2ba79a061046bc042113f0b
Parents: 677b508
Author: Jin Mingjian <ji...@gmail.com>
Authored: Tue Feb 21 11:57:21 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |  7 +++++
 .../flink/api/common/ExecutionConfig.java       | 29 ++++++++++++++++++++
 .../api/java/typeutils/GenericTypeInfo.java     |  6 ++++
 .../flink/api/common/ExecutionConfigTest.java   | 25 +++++++++++++++++
 .../graph/StreamingJobGraphGeneratorTest.java   |  8 +++++-
 5 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index e723c33..20ee071 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,6 +306,13 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
 
 There are different variants of these methods available.
 
+If you do not want to fall back to Kryo and further make sure that you have provided your own custom serializers for all POJOs explicitly, set
+{% highlight java %}
+env.getConfig().disableGenericTypes();
+{% endhighlight %}
+
+If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink tries to fall back to the default Kryo serializer logic in the runtime.
+
 ## Defining Type Information using a Factory
 
 A type information factory allows for plugging-in user-defined type information into the Flink type system.

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 32ea0a3..3bd91c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
 	private boolean forceKryo = false;
 
+	private boolean disableGenericTypes = false;
+
 	private boolean objectReuse = false;
 
 	private boolean autoTypeRegistrationEnabled = true;
@@ -519,6 +521,31 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	}
 
 	/**
+	 * Enable generic types.
+	 *
+	 * @see ExecutionConfig#disableGenericTypes()
+	 */
+	public void enableGenericTypes() {
+		disableGenericTypes = false;
+	}
+
+	/**
+	 * Disable generic types to make sure that you have provided your own custom serializers for
+	 * all POJOs explicitly.
+	 *
+	 * If generic types disabled,
+	 * an {@link UnsupportedOperationException} will be thrown when Flink
+	 * tries to fall back to the default Kryo serializer logic in the runtime.
+	 */
+	public void disableGenericTypes() {
+		disableGenericTypes = true;
+	}
+
+	public boolean hasGenericTypesDisabled() {
+		return disableGenericTypes;
+	}
+
+	/**
 	 * Force Flink to use the AvroSerializer for POJOs.
 	 */
 	public void enableForceAvro() {
@@ -804,6 +831,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 				((restartStrategyConfiguration == null && other.restartStrategyConfiguration == null) ||
 					(null != restartStrategyConfiguration && restartStrategyConfiguration.equals(other.restartStrategyConfiguration))) &&
 				forceKryo == other.forceKryo &&
+				disableGenericTypes == other.disableGenericTypes &&
 				objectReuse == other.objectReuse &&
 				autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled &&
 				forceAvro == other.forceAvro &&
@@ -830,6 +858,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 			parallelism,
 			restartStrategyConfiguration,
 			forceKryo,
+			disableGenericTypes,
 			objectReuse,
 			autoTypeRegistrationEnabled,
 			forceAvro,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index bc4e87a..a4cea31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -81,6 +81,12 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 	@Override
 	@PublicEvolving
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+		if (config.hasGenericTypesDisabled()) {
+			throw new UnsupportedOperationException(
+				"Generic types are disabled for POJOs serialization, but type " + this.typeClass +
+				" is treated as a generic type.");
+		}
+
 		return new KryoSerializer<T>(this.typeClass, config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 883ee6c..4956a9a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.api.common;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -25,6 +29,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ExecutionConfigTest {
 
@@ -64,4 +69,24 @@ public class ExecutionConfigTest {
 		assertEquals(parallelism, config.getParallelism());
 	}
 
+	@Test
+	public void testForceCustomSerializerCheck() {
+		ExecutionConfig conf = new ExecutionConfig();
+		TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class);
+		TypeSerializer<Object> serializer = typeInfo.createSerializer(conf);
+		assertTrue(serializer instanceof KryoSerializer);
+
+		conf.disableGenericTypes();
+		boolean createSerializerFailed = false;
+		try {
+			typeInfo.createSerializer(conf);
+		} catch (UnsupportedOperationException e) {
+			createSerializerFailed = true;
+		} catch (Throwable t) {
+			fail("Unexpected exception thrown: " + t.getMessage());
+		}
+
+		assertTrue(createSerializerFailed);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7c51bc2..968b1c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -55,7 +55,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		StreamGraph streamingJob = new StreamGraph(env);
 		StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
 		
-		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
+		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
 		int dop = 1 + r.nextInt(10);
 		
 		ExecutionConfig config = streamingJob.getExecutionConfig();
@@ -74,6 +74,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		} else {
 			config.disableForceKryo();
 		}
+		if(disableGenericTypes) {
+			config.disableGenericTypes();
+		} else {
+			config.enableGenericTypes();
+		}
 		if(objectReuseEnabled) {
 			config.enableObjectReuse();
 		} else {
@@ -106,6 +111,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
 		assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
 		assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
+		assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
 		assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
 		assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
 		assertEquals(dop, executionConfig.getParallelism());


[2/8] flink git commit: [FLINK-5692] [core] (followups) Add an Option to Deactivate Kryo Fallback for Serializers

Posted by se...@apache.org.
[FLINK-5692] [core] (followups) Add an Option to Deactivate Kryo Fallback for Serializers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d498cbed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d498cbed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d498cbed

Branch: refs/heads/master
Commit: d498cbedfda7c5ebabbc5f8203d7518926ede423
Parents: 0f99aae
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 15:18:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 | 10 ++++--
 .../flink/api/common/ExecutionConfig.java       | 35 ++++++++++++++++----
 .../api/java/typeutils/GenericTypeInfo.java     |  2 +-
 .../flink/api/common/ExecutionConfigTest.java   | 17 +++++-----
 4 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 20ee071..053a839 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,12 +306,18 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
 
 There are different variants of these methods available.
 
-If you do not want to fall back to Kryo and further make sure that you have provided your own custom serializers for all POJOs explicitly, set
+
+## Disabling Kryo Fallback
+
+There are cases when programs may want to explicitly avoid using Kryo as a fallback for generic types. The most
+common one is wanting to ensure that all types are efficiently serialized either through Flink's own serializers,
+or via user-defined custom serializers.
+
+The setting below will raise an exception whenever a data type is encountered that would go through Kryo:
 {% highlight java %}
 env.getConfig().disableGenericTypes();
 {% endhighlight %}
 
-If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink tries to fall back to the default Kryo serializer logic in the runtime.
 
 ## Defining Type Information using a Factory
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3bd91c7..26e6af1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
 	private boolean forceKryo = false;
 
+	/** Flag to indicate whether generic types (through Kryo) are supported */
 	private boolean disableGenericTypes = false;
 
 	private boolean objectReuse = false;
@@ -521,26 +522,46 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	}
 
 	/**
-	 * Enable generic types.
+	 * Enables the use generic types which are serialized via Kryo.
+	 * 
+	 * <p>Generic types are enabled by default.
 	 *
-	 * @see ExecutionConfig#disableGenericTypes()
+	 * @see #disableGenericTypes()
 	 */
 	public void enableGenericTypes() {
 		disableGenericTypes = false;
 	}
 
 	/**
-	 * Disable generic types to make sure that you have provided your own custom serializers for
-	 * all POJOs explicitly.
+	 * Disables the use of generic types (types that would be serialized via Kryo). If this option
+	 * is used, Flink will throw an {@code UnsupportedOperationException} whenever it encounters
+	 * a data type that would go through Kryo for serialization.
 	 *
-	 * If generic types disabled,
-	 * an {@link UnsupportedOperationException} will be thrown when Flink
-	 * tries to fall back to the default Kryo serializer logic in the runtime.
+	 * <p>Disabling generic types can be helpful to eagerly find and eliminate teh use of types
+	 * that would go through Kryo serialization during runtime. Rather than checking types
+	 * individually, using this option will throw exceptions eagerly in the places where generic
+	 * types are used.
+	 * 
+	 * <p><b>Important:</b> We recommend to use this option only during development and pre-production
+	 * phases, not during actual production use. The application program and/or the input data may be
+	 * such that new, previously unseen, types occur at some point. In that case, setting this option
+	 * would cause the program to fail.
+	 * 
+	 * @see #enableGenericTypes()
 	 */
 	public void disableGenericTypes() {
 		disableGenericTypes = true;
 	}
 
+	/**
+	 * Checks whether generic types are supported. Generic types are types that go through Kryo during
+	 * serialization.
+	 * 
+	 * <p>Generic types are enabled by default.
+	 * 
+	 * @see #enableGenericTypes()
+	 * @see #disableGenericTypes()
+	 */
 	public boolean hasGenericTypesDisabled() {
 		return disableGenericTypes;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index a4cea31..136b7d7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -83,7 +83,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
 		if (config.hasGenericTypesDisabled()) {
 			throw new UnsupportedOperationException(
-				"Generic types are disabled for POJOs serialization, but type " + this.typeClass +
+				"Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() +
 				" is treated as a generic type.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 4956a9a..d000ff9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -70,23 +70,22 @@ public class ExecutionConfigTest {
 	}
 
 	@Test
-	public void testForceCustomSerializerCheck() {
+	public void testDisableGenericTypes() {
 		ExecutionConfig conf = new ExecutionConfig();
 		TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class);
+
+		// by default, generic types are supported
 		TypeSerializer<Object> serializer = typeInfo.createSerializer(conf);
 		assertTrue(serializer instanceof KryoSerializer);
 
+		// expect an exception when generic types are disabled
 		conf.disableGenericTypes();
-		boolean createSerializerFailed = false;
 		try {
 			typeInfo.createSerializer(conf);
-		} catch (UnsupportedOperationException e) {
-			createSerializerFailed = true;
-		} catch (Throwable t) {
-			fail("Unexpected exception thrown: " + t.getMessage());
+			fail("should have failed with an exception");
+		}
+		catch (UnsupportedOperationException e) {
+			// expected
 		}
-
-		assertTrue(createSerializerFailed);
 	}
-
 }


[4/8] flink git commit: [hotfix] [core] Fix/cleanup serialization test for ExecutionConfig

Posted by se...@apache.org.
[hotfix] [core] Fix/cleanup serialization test for ExecutionConfig


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afd36f98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afd36f98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afd36f98

Branch: refs/heads/master
Commit: afd36f9814ee282df8e3a58e846911f6efa54c61
Parents: d498cbe
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 15:25:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfigTest.java   | 76 ++++++++++++++++++-
 .../graph/StreamingJobGraphGeneratorTest.java   | 79 +-------------------
 2 files changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index d000ff9..7e98604 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -22,12 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -39,17 +45,17 @@ public class ExecutionConfigTest {
 		List<Class<?>> types = Arrays.<Class<?>>asList(Double.class, Integer.class, Double.class);
 		List<Class<?>> expectedTypes = Arrays.<Class<?>>asList(Double.class, Integer.class);
 
-		for(Class<?> tpe: types) {
+		for (Class<?> tpe: types) {
 			config.registerKryoType(tpe);
 		}
 
 		int counter = 0;
 
-		for(Class<?> tpe: config.getRegisteredKryoTypes()){
+		for (Class<?> tpe: config.getRegisteredKryoTypes()){
 			assertEquals(tpe, expectedTypes.get(counter++));
 		}
 
-		assertTrue(counter == expectedTypes.size());
+		assertEquals(expectedTypes.size(), counter);
 	}
 
 	@Test
@@ -88,4 +94,68 @@ public class ExecutionConfigTest {
 			// expected
 		}
 	}
+
+	@Test
+	public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
+		final Random r = new Random();
+
+		final int parallelism = 1 + r.nextInt(10);
+		final boolean closureCleanerEnabled = r.nextBoolean(), 
+				forceAvroEnabled = r.nextBoolean(),
+				forceKryoEnabled = r.nextBoolean(),
+				disableGenericTypes = r.nextBoolean(),
+				objectReuseEnabled = r.nextBoolean(),
+				sysoutLoggingEnabled = r.nextBoolean();
+
+		final ExecutionConfig config = new ExecutionConfig();
+
+		if (closureCleanerEnabled) {
+			config.enableClosureCleaner();
+		} else {
+			config.disableClosureCleaner();
+		}
+		if (forceAvroEnabled) {
+			config.enableForceAvro();
+		} else {
+			config.disableForceAvro();
+		}
+		if (forceKryoEnabled) {
+			config.enableForceKryo();
+		} else {
+			config.disableForceKryo();
+		}
+		if (disableGenericTypes) {
+			config.disableGenericTypes();
+		} else {
+			config.enableGenericTypes();
+		}
+		if (objectReuseEnabled) {
+			config.enableObjectReuse();
+		} else {
+			config.disableObjectReuse();
+		}
+		if (sysoutLoggingEnabled) {
+			config.enableSysoutLogging();
+		} else {
+			config.disableSysoutLogging();
+		}
+		config.setParallelism(parallelism);
+
+		final ExecutionConfig copy1 = CommonTestUtils.createCopySerializable(config);
+		final ExecutionConfig copy2 = new SerializedValue<>(config).deserializeValue(getClass().getClassLoader());
+
+		assertNotNull(copy1);
+		assertNotNull(copy2);
+
+		assertEquals(config, copy1);
+		assertEquals(config, copy2);
+
+		assertEquals(closureCleanerEnabled, copy1.isClosureCleanerEnabled());
+		assertEquals(forceAvroEnabled, copy1.isForceAvroEnabled());
+		assertEquals(forceKryoEnabled, copy1.isForceKryoEnabled());
+		assertEquals(disableGenericTypes, copy1.hasGenericTypesDisabled());
+		assertEquals(objectReuseEnabled, copy1.isObjectReuseEnabled());
+		assertEquals(sysoutLoggingEnabled, copy1.isSysoutLoggingEnabled());
+		assertEquals(parallelism, copy1.getParallelism());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 968b1c9..5f1973c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -27,96 +26,20 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
-	
-	@Test
-	public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
-		final long seed = System.currentTimeMillis();
-		final Random r = new Random(seed);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		StreamGraph streamingJob = new StreamGraph(env);
-		StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
-		
-		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
-		int dop = 1 + r.nextInt(10);
-		
-		ExecutionConfig config = streamingJob.getExecutionConfig();
-		if(closureCleanerEnabled) {
-			config.enableClosureCleaner();
-		} else {
-			config.disableClosureCleaner();
-		}
-		if(forceAvroEnabled) {
-			config.enableForceAvro();
-		} else {
-			config.disableForceAvro();
-		}
-		if(forceKryoEnabled) {
-			config.enableForceKryo();
-		} else {
-			config.disableForceKryo();
-		}
-		if(disableGenericTypes) {
-			config.disableGenericTypes();
-		} else {
-			config.enableGenericTypes();
-		}
-		if(objectReuseEnabled) {
-			config.enableObjectReuse();
-		} else {
-			config.disableObjectReuse();
-		}
-		if(sysoutLoggingEnabled) {
-			config.enableSysoutLogging();
-		} else {
-			config.disableSysoutLogging();
-		}
-		config.setParallelism(dop);
-		
-		JobGraph jobGraph = compiler.createJobGraph();
-
-		final String EXEC_CONFIG_KEY = "runtime.config";
-
-		InstantiationUtil.writeObjectToConfig(jobGraph.getSerializedExecutionConfig(),
-			jobGraph.getJobConfiguration(),
-			EXEC_CONFIG_KEY);
-
-		SerializedValue<ExecutionConfig> serializedExecutionConfig = InstantiationUtil.readObjectFromConfig(
-				jobGraph.getJobConfiguration(),
-				EXEC_CONFIG_KEY,
-				Thread.currentThread().getContextClassLoader());
 
-		assertNotNull(serializedExecutionConfig);
-
-		ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(getClass().getClassLoader());
-
-		assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
-		assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
-		assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
-		assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
-		assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
-		assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
-		assertEquals(dop, executionConfig.getParallelism());
-	}
-	
 	@Test
 	public void testParallelismOneNotChained() {