You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:42 UTC
[1/8] flink git commit: [FLINK-5962] [checkpoints] Remove scheduled
cancel-task from timer queue to prevent memory leaks
Repository: flink
Updated Branches:
refs/heads/master d160b5e56 -> 24408e190
[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks
This closes #3548
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70252f34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70252f34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70252f34
Branch: refs/heads/master
Commit: 70252f3468916758e8bc456bbf482549c38ad7ff
Parents: afd36f9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 16:44:41 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 79 +++++++++++++-------
.../runtime/checkpoint/PendingCheckpoint.java | 38 ++++++++++
.../checkpoint/CheckpointCoordinatorTest.java | 16 +++-
.../checkpoint/PendingCheckpointTest.java | 18 +++++
4 files changed, 123 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0592e3d..cc60837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,9 +50,10 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -134,7 +136,7 @@ public class CheckpointCoordinator {
private final int maxConcurrentCheckpointAttempts;
/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
- private final Timer timer;
+ private final ScheduledThreadPoolExecutor timer;
/** Actor that receives status updates from the execution graph this coordinator works for */
private JobStatusListener jobStatusListener;
@@ -142,7 +144,8 @@ public class CheckpointCoordinator {
/** The number of consecutive failed trigger attempts */
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
- private ScheduledTrigger currentPeriodicTrigger;
+ /** A handle to the current periodic trigger, to cancel it when necessary */
+ private ScheduledFuture<?> currentPeriodicTrigger;
/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
private long lastCheckpointCompletionNanos;
@@ -218,7 +221,13 @@ public class CheckpointCoordinator {
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
- this.timer = new Timer("Checkpoint Timer", true);
+ this.timer = new ScheduledThreadPoolExecutor(1,
+ new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
+
+ // make sure the timer internally cleans up and does not hold onto stale scheduled tasks
+ this.timer.setRemoveOnCancelPolicy(true);
+ this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
if (externalizeSettings.externalizeCheckpoints()) {
LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
@@ -265,7 +274,7 @@ public class CheckpointCoordinator {
triggerRequestQueued = false;
// shut down the thread that handles the timeouts and pending triggers
- timer.cancel();
+ timer.shutdownNow();
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
@@ -392,7 +401,7 @@ public class CheckpointCoordinator {
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -404,13 +413,14 @@ public class CheckpointCoordinator {
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
- ScheduledTrigger trigger = new ScheduledTrigger();
// Reassign the new trigger to the currentPeriodicTrigger
- currentPeriodicTrigger = trigger;
- timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
@@ -483,7 +493,7 @@ public class CheckpointCoordinator {
}
// schedule the timer that will clean up the expired checkpoints
- TimerTask canceller = new TimerTask() {
+ final Runnable canceller = new Runnable() {
@Override
public void run() {
synchronized (lock) {
@@ -519,7 +529,7 @@ public class CheckpointCoordinator {
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -531,14 +541,15 @@ public class CheckpointCoordinator {
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
- ScheduledTrigger trigger = new ScheduledTrigger();
// Reassign the new trigger to the currentPeriodicTrigger
- currentPeriodicTrigger = trigger;
- timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
@@ -546,7 +557,15 @@ public class CheckpointCoordinator {
LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
pendingCheckpoints.put(checkpointID, checkpoint);
- timer.schedule(canceller, checkpointTimeout);
+
+ ScheduledFuture<?> cancellerHandle = timer.schedule(
+ canceller,
+ checkpointTimeout, TimeUnit.MILLISECONDS);
+
+ if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+ // checkpoint is already disposed!
+ cancellerHandle.cancel(false);
+ }
}
// end of lock scope
@@ -866,20 +885,25 @@ public class CheckpointCoordinator {
// trigger the checkpoint from the trigger timer, to finish the work of this thread before
// starting with the next checkpoint
- ScheduledTrigger trigger = new ScheduledTrigger();
if (periodicScheduling) {
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
}
- currentPeriodicTrigger = trigger;
- timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ 0L, baseInterval, TimeUnit.MILLISECONDS);
}
else {
- timer.schedule(trigger, 0L);
+ timer.execute(new ScheduledTrigger());
}
}
}
+ @VisibleForTesting
+ int getNumScheduledTasks() {
+ return timer.getQueue().size();
+ }
+
// --------------------------------------------------------------------------------------------
// Checkpoint State Restoring
// --------------------------------------------------------------------------------------------
@@ -1006,8 +1030,9 @@ public class CheckpointCoordinator {
stopCheckpointScheduler();
periodicScheduling = true;
- currentPeriodicTrigger = new ScheduledTrigger();
- timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ baseInterval, baseInterval, TimeUnit.MILLISECONDS);
}
}
@@ -1017,7 +1042,7 @@ public class CheckpointCoordinator {
periodicScheduling = false;
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
@@ -1050,7 +1075,7 @@ public class CheckpointCoordinator {
// ------------------------------------------------------------------------
- private class ScheduledTrigger extends TimerTask {
+ private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 5ca6040..b7eb037 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -46,6 +46,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -99,6 +100,8 @@ public class PendingCheckpoint {
@Nullable
private PendingCheckpointStats statsCallback;
+ private volatile ScheduledFuture<?> cancellerHandle;
+
// --------------------------------------------------------------------------------------------
public PendingCheckpoint(
@@ -197,6 +200,27 @@ public class PendingCheckpoint {
this.statsCallback = trackerCallback;
}
+ /**
+ * Sets the handle for the canceller to this pending checkoint.
+ *
+ * @return true, if the handle was set, false, if the checkpoint is already disposed;
+ */
+ public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
+ synchronized (lock) {
+ if (this.cancellerHandle == null) {
+ if (!discarded) {
+ this.cancellerHandle = cancellerHandle;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ else {
+ throw new IllegalStateException("A canceller handle was already set");
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// Progress and Completion
// ------------------------------------------------------------------------
@@ -490,10 +514,24 @@ public class PendingCheckpoint {
discarded = true;
notYetAcknowledgedTasks.clear();
acknowledgedTasks.clear();
+ cancelCanceller();
}
}
}
+ private void cancelCanceller() {
+ try {
+ final ScheduledFuture<?> canceller = this.cancellerHandle;
+ if (canceller != null) {
+ canceller.cancel(false);
+ }
+ }
+ catch (Exception e) {
+ // this code should not throw exceptions
+ LOG.warn("Error while cancelling checkpoint timeout task", e);
+ }
+ }
+
/**
* Reports a failed checkpoint with the given optional cause.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 1691370..d8bba59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest {
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ // we have one task scheduled that will cancel after timeout
+ assertEquals(1, coord.getNumScheduledTasks());
+
long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
@@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest {
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
assertTrue(checkpoint.isDiscarded());
+ // the canceler is also removed
+ assertEquals(0, coord.getNumScheduledTasks());
+
// validate that we have no new pending checkpoint
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest {
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumScheduledTasks());
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest {
// validate that we have a pending checkpoint
assertEquals(2, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(2, coord.getNumScheduledTasks());
Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
long checkpoint1Id = it.next().getKey();
@@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest {
}
// decline checkpoint from one of the tasks, this should cancel the checkpoint
- // and trigger a new one
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
assertTrue(checkpoint1.isDiscarded());
// validate that we have only one pending checkpoint left
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, coord.getNumScheduledTasks());
// validate that it is the same second checkpoint from earlier
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
@@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest {
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumScheduledTasks());
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest {
// validate that we have a pending checkpoint
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, coord.getNumScheduledTasks());
long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
@@ -556,6 +566,9 @@ public class CheckpointCoordinatorTest {
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ // the canceler should be removed now
+ assertEquals(0, coord.getNumScheduledTasks());
+
// validate that the relevant tasks got a confirmation message
{
verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
@@ -580,6 +593,7 @@ public class CheckpointCoordinatorTest {
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumScheduledTasks());
CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, successNew.getJobId());
http://git-wip-us.apache.org/repos/asf/flink/blob/70252f34/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6f04f39..55b5fe0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -287,6 +288,23 @@ public class PendingCheckpointTest {
}
}
+ @Test
+ public void testSetCanceller() {
+ final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
+
+ PendingCheckpoint aborted = createPendingCheckpoint(props, null);
+ aborted.abortDeclined();
+ assertTrue(aborted.isDiscarded());
+ assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
+
+ PendingCheckpoint pending = createPendingCheckpoint(props, null);
+ ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
+
+ assertTrue(pending.setCancellerHandle(canceller));
+ pending.abortDeclined();
+ verify(canceller).cancel(false);
+ }
+
// ------------------------------------------------------------------------
private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
[8/8] flink git commit: [FLINK-4754] [checkpoints] Small followups to
the configuration of number of retained checkpoints.
Posted by se...@apache.org.
[FLINK-4754] [checkpoints] Small followups to the configuration of number of retained checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24408e19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24408e19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24408e19
Branch: refs/heads/master
Commit: 24408e19037c8761924ca66a557dfdd8236a7be4
Parents: b46f5e0
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 16 11:17:16 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100
----------------------------------------------------------------------
docs/setup/config.md | 2 +-
.../apache/flink/configuration/CoreOptions.java | 21 +++--
.../executiongraph/ExecutionGraphBuilder.java | 11 ++-
.../ExecutionGraphDeploymentTest.java | 80 +++++++++-----------
4 files changed, 58 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 048e012..c835882 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,7 +182,7 @@ will be used under the directory specified by jobmanager.web.tmpdir.
- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints).
-- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1)
+- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 1e40569..8cb4123 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -23,9 +23,10 @@ import org.apache.flink.annotation.PublicEvolving;
@PublicEvolving
public class CoreOptions {
- /**
- *
- */
+ // ------------------------------------------------------------------------
+ // process parameters
+ // ------------------------------------------------------------------------
+
public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
.key("env.java.opts")
.defaultValue("");
@@ -38,16 +39,24 @@ public class CoreOptions {
.key("env.java.opts.taskmanager")
.defaultValue("");
+ // ------------------------------------------------------------------------
+ // program
+ // ------------------------------------------------------------------------
+
public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
.key("parallelism.default")
.defaultValue(-1);
-
+
+ // ------------------------------------------------------------------------
+ // checkpoints / fault tolerance
+ // ------------------------------------------------------------------------
+
public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
.key("state.backend")
.noDefaultValue();
/** The maximum number of completed checkpoint instances to retain.*/
- public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
- .key("state.checkpoints.max-retained-checkpoints")
+ public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS = ConfigOptions
+ .key("state.checkpoints.num-retained")
.defaultValue(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8a35773..8471178 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -178,12 +178,17 @@ public class ExecutionGraphBuilder {
CheckpointIDCounter checkpointIdCounter;
try {
int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
- CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+ CoreOptions.MAX_RETAINED_CHECKPOINTS);
+
if (maxNumberOfCheckpointsToRetain <= 0) {
// warning and use 1 as the default value if the setting in
// state.checkpoints.max-retained-checkpoints is not greater than 0.
- log.warn("The setting for max-retained-checkpoints is not a positive number.");
- maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+ log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
+ CoreOptions.MAX_RETAINED_CHECKPOINTS.key(),
+ maxNumberOfCheckpointsToRetain,
+ CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+
+ maxNumberOfCheckpointsToRetain = CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
}
completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
http://git-wip-us.apache.org/repos/asf/flink/blob/24408e19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 57b549b..7f5811a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,7 +33,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
@@ -446,61 +445,50 @@ public class ExecutionGraphDeploymentTest {
assertEquals(JobStatus.FAILED, eg.getState());
}
+ // ------------------------------------------------------------------------
+ // retained checkpoints config test
+ // ------------------------------------------------------------------------
+
@Test
- public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
- try {
- final Configuration jobManagerConfig = new Configuration();
+ public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
+ final Configuration jobManagerConfig = new Configuration();
- final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
- assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+ assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
}
@Test
- public void testSettingMaxNumberOfCheckpointsToRetain() {
- try {
- final int maxNumberOfCheckpointsToRetain = 10;
- final Configuration jobManagerConfig = new Configuration();
- jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
- maxNumberOfCheckpointsToRetain);
+ public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
- final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+ final int maxNumberOfCheckpointsToRetain = 10;
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+ maxNumberOfCheckpointsToRetain);
- assertEquals(maxNumberOfCheckpointsToRetain,
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertEquals(maxNumberOfCheckpointsToRetain,
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
}
@Test
- public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
- try {
- final int negativeMaxNumberOfCheckpointsToRetain = -10;
+ public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
- final Configuration jobManagerConfig = new Configuration();
- jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
- negativeMaxNumberOfCheckpointsToRetain);
+ final int negativeMaxNumberOfCheckpointsToRetain = -10;
- final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS,
+ negativeMaxNumberOfCheckpointsToRetain);
- assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
- eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+
+ assertEquals(CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue().intValue(),
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
}
private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
@@ -567,14 +555,14 @@ public class ExecutionGraphDeploymentTest {
}
private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
- final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+ final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test");
jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
- new ArrayList<JobVertexID>(1),
- new ArrayList<JobVertexID>(1),
- new ArrayList<JobVertexID>(1),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
+ Collections.<JobVertexID>emptyList(),
100,
10 * 60 * 1000,
0,
@@ -592,7 +580,7 @@ public class ExecutionGraphDeploymentTest {
new ProgrammedSlotProvider(1),
getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
- Time.minutes(10),
+ Time.seconds(10),
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
1,
[7/8] flink git commit: [FLINK-4754] [checkpoints] Make number of
retained checkpoints user configurable
Posted by se...@apache.org.
[FLINK-4754] [checkpoints] Make number of retained checkpoints user configurable
This closes #3374
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b46f5e05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b46f5e05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b46f5e05
Branch: refs/heads/master
Commit: b46f5e050bdd77fe6e501bad20466d8777218131
Parents: 6b5e1f6
Author: Tony Wei <to...@gmail.com>
Authored: Mon Feb 20 18:30:24 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100
----------------------------------------------------------------------
docs/setup/config.md | 2 +
.../apache/flink/configuration/CoreOptions.java | 5 +
.../checkpoint/CheckpointRecoveryFactory.java | 3 +-
.../checkpoint/CompletedCheckpointStore.java | 5 +
.../StandaloneCheckpointRecoveryFactory.java | 5 +-
.../StandaloneCompletedCheckpointStore.java | 5 +
.../ZooKeeperCheckpointRecoveryFactory.java | 4 +-
.../ZooKeeperCompletedCheckpointStore.java | 5 +
.../executiongraph/ExecutionGraphBuilder.java | 12 ++-
.../CheckpointCoordinatorFailureTest.java | 5 +
.../ExecutionGraphDeploymentTest.java | 101 +++++++++++++++++++
.../jobmanager/JobManagerHARecoveryTest.java | 7 +-
12 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 013e56a..048e012 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -182,6 +182,8 @@ will be used under the directory specified by jobmanager.web.tmpdir.
- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/setup/checkpoints.html#externalized-checkpoints).
+- `state.checkpoints.max-retained-checkpoints`: The maximum number of completed checkpoint instances to retain. This setting defines how many completed checkpoint instances can be stored in `CompletedCheckpointStore`. (Default: 1)
+
- `high-availability.zookeeper.storageDir`: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named `recovery.zookeeper.storageDir`.
- `blob.storage.directory`: Directory for storing blobs (such as user JARs) on the TaskManagers.
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 4e30ceb..1e40569 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -45,4 +45,9 @@ public class CoreOptions {
public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
.key("state.backend")
.noDefaultValue();
+
+ /** The maximum number of completed checkpoint instances to retain.*/
+ public static final ConfigOption<Integer> STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS = ConfigOptions
+ .key("state.checkpoints.max-retained-checkpoints")
+ .defaultValue(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index 0c7dfa7..3fb1385 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -44,10 +44,11 @@ public interface CheckpointRecoveryFactory {
* Creates a {@link CompletedCheckpointStore} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
+ * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
* @param userClassLoader User code class loader of the job
* @return {@link CompletedCheckpointStore} instance for the job
*/
- CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+ CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index e91e038..9c2b199 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -73,6 +73,11 @@ public interface CompletedCheckpointStore {
int getNumberOfRetainedCheckpoints();
/**
+ * Returns the max number of retained checkpoints.
+ */
+ int getMaxNumberOfRetainedCheckpoints();
+
+ /**
* This method returns whether the completed checkpoint store requires checkpoints to be
* externalized. Externalized checkpoints have their meta data persisted, which the checkpoint
* store can exploit (for example by simply pointing the persisted metadata).
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 57785ce..2d2cc2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -37,11 +37,10 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
}
@Override
- public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+ public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
- return new StandaloneCompletedCheckpointStore(
- CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+ return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index a0248b2..6c752f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -84,6 +84,11 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
}
@Override
+ public int getMaxNumberOfRetainedCheckpoints() {
+ return maxNumberOfCheckpointsToRetain;
+ }
+
+ @Override
public void shutdown(JobStatus jobStatus) throws Exception {
try {
LOG.info("Shutting down");
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 09bfa8c..481559b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -59,11 +59,11 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
}
@Override
- public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader)
+ public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader)
throws Exception {
return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId,
- NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, executor);
+ maxNumberOfCheckpointsToRetain, executor);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 7a167cb..1319c27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -252,6 +252,11 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
}
@Override
+ public int getMaxNumberOfRetainedCheckpoints() {
+ return maxNumberOfCheckpointsToRetain;
+ }
+
+ @Override
public void shutdown(JobStatus jobStatus) throws Exception {
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Shutting down");
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ec7103c..8a35773 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -176,7 +177,16 @@ public class ExecutionGraphBuilder {
CompletedCheckpointStore completedCheckpoints;
CheckpointIDCounter checkpointIdCounter;
try {
- completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, classLoader);
+ int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
+ CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS);
+ if (maxNumberOfCheckpointsToRetain <= 0) {
+ // warning and use 1 as the default value if the setting in
+ // state.checkpoints.max-retained-checkpoints is not greater than 0.
+ log.warn("The setting for max-retained-checkpoints is not a positive number.");
+ maxNumberOfCheckpointsToRetain = CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue();
+ }
+
+ completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 9517257..340e2a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -136,6 +136,11 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
}
@Override
+ public int getMaxNumberOfRetainedCheckpoints() {
+ return 1;
+ }
+
+ @Override
public boolean requiresExternalizedCheckpoints() {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 30824e0..57b549b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -22,6 +22,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotEquals;
import java.util.ArrayList;
import java.util.Arrays;
@@ -32,14 +33,20 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -51,8 +58,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.operators.BatchTask;
@@ -62,6 +72,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
+import org.slf4j.LoggerFactory;
public class ExecutionGraphDeploymentTest {
@@ -435,6 +446,63 @@ public class ExecutionGraphDeploymentTest {
assertEquals(JobStatus.FAILED, eg.getState());
}
+ @Test
+ public void testSettingDefaultMaxNumberOfCheckpointsToRetain() {
+ try {
+ final Configuration jobManagerConfig = new Configuration();
+
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSettingMaxNumberOfCheckpointsToRetain() {
+ try {
+ final int maxNumberOfCheckpointsToRetain = 10;
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+ maxNumberOfCheckpointsToRetain);
+
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertEquals(maxNumberOfCheckpointsToRetain,
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSettingIllegalMaxNumberOfCheckpointsToRetain() {
+ try {
+ final int negativeMaxNumberOfCheckpointsToRetain = -10;
+
+ final Configuration jobManagerConfig = new Configuration();
+ jobManagerConfig.setInteger(CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS,
+ negativeMaxNumberOfCheckpointsToRetain);
+
+ final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
+
+ assertNotEquals(negativeMaxNumberOfCheckpointsToRetain,
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+ assertEquals((int) CoreOptions.STATE_BACKEND_MAX_RETAINED_CHECKPOINTS_OPTIONS.defaultValue(),
+ eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
final JobID jobId = new JobID();
@@ -497,4 +565,37 @@ public class ExecutionGraphDeploymentTest {
throw new Exception();
}
}
+
+ private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
+ final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+ final JobID jobId = new JobID();
+ final JobGraph jobGraph = new JobGraph(jobId, "test");
+ jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+ new ArrayList<JobVertexID>(1),
+ new ArrayList<JobVertexID>(1),
+ new ArrayList<JobVertexID>(1),
+ 100,
+ 10 * 60 * 1000,
+ 0,
+ 1,
+ ExternalizedCheckpointSettings.none(),
+ null,
+ false));
+
+ return ExecutionGraphBuilder.buildGraph(
+ null,
+ jobGraph,
+ configuration,
+ executor,
+ executor,
+ new ProgrammedSlotProvider(1),
+ getClass().getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.minutes(10),
+ new NoRestartStrategy(),
+ new UnregisteredMetricsGroup(),
+ 1,
+ LoggerFactory.getLogger(getClass()));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b46f5e05/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 115b06c..32358c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -485,6 +485,11 @@ public class JobManagerHARecoveryTest {
}
@Override
+ public int getMaxNumberOfRetainedCheckpoints() {
+ return 1;
+ }
+
+ @Override
public boolean requiresExternalizedCheckpoints() {
return false;
}
@@ -509,7 +514,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception {
+ public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
return store;
}
[6/8] flink git commit: [FLINK-4422] [clients] Convert time interval
measurements to System.nanoTime() in 'flink-clients'
Posted by se...@apache.org.
[FLINK-4422] [clients] Convert time interval measurements to System.nanoTime() in 'flink-clients'
This closes #3384
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b5e1f68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b5e1f68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b5e1f68
Branch: refs/heads/master
Commit: 6b5e1f68a9b78901f0af57f446b465a7b03a88bd
Parents: 70252f3
Author: Jin Mingjian <ji...@gmail.com>
Authored: Tue Feb 21 22:43:54 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:27 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/client/program/ClientConnectionTest.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b5e1f68/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 4eb5269..fc24a9d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -115,13 +115,13 @@ public class ClientConnectionTest {
try {
// wait until the caller is successful, for at most the given time
- long now = System.currentTimeMillis();
- long deadline = now + MAX_DELAY;
+ long now = System.nanoTime();
+ long deadline = now + MAX_DELAY * 1_000_000;
synchronized (error) {
while (invoker.isAlive() && error.get() == null && now < deadline) {
error.wait(1000);
- now = System.currentTimeMillis();
+ now = System.nanoTime();
}
}
[5/8] flink git commit: [FLINK-5756] [rocksdb] Add mini benchmarks to
reproduce 'merge' performance problems
Posted by se...@apache.org.
[FLINK-5756] [rocksdb] Add mini benchmarks to reproduce 'merge' performance problems
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/677b508a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/677b508a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/677b508a
Branch: refs/heads/master
Commit: 677b508a962c5c7df9308ac3531e799cddec27f6
Parents: d160b5e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 19:13:07 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
.../ListViaMergeSpeedMiniBenchmark.java | 104 ++++++++++++++++
.../ListViaRangeSpeedMiniBenchmark.java | 121 +++++++++++++++++++
2 files changed, 225 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..2a530e1
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+
+public class ListViaMergeSpeedMiniBenchmark {
+
+ public static void main(String[] args) throws Exception {
+ final File rocksDir = new File("/tmp/rdb");
+ FileUtils.deleteDirectory(rocksDir);
+
+ final Options options = new Options()
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setMaxOpenFiles(-1)
+ .setDisableDataSync(true)
+ .setCreateIfMissing(true)
+ .setMergeOperator(new StringAppendOperator());
+
+ final WriteOptions write_options = new WriteOptions()
+ .setSync(false)
+ .setDisableWAL(true);
+
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+ final String key = "key";
+ final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ final int num = 50000;
+
+ // ----- insert -----
+ System.out.println("begin insert");
+
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ rocksDB.merge(write_options, keyBytes, valueBytes);
+ }
+ final long endInsert = System.nanoTime();
+ System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+ // ----- read (attempt 1) -----
+
+ final byte[] resultHolder = new byte[num * (valueBytes.length + 2)];
+ final long beginGet1 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet1 = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms");
+
+ // ----- read (attempt 2) -----
+
+ final long beginGet2 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet2 = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms");
+
+ // ----- compact -----
+ System.out.println("compacting...");
+ final long beginCompact = System.nanoTime();
+ rocksDB.compactRange();
+ final long endCompact = System.nanoTime();
+
+ System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms");
+
+ // ----- read (attempt 3) -----
+
+ final long beginGet3 = System.nanoTime();
+ rocksDB.get(keyBytes, resultHolder);
+ final long endGet3 = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/677b508a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
new file mode 100644
index 0000000..793a35b
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.util.FileUtils;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public class ListViaRangeSpeedMiniBenchmark {
+
+ public static void main(String[] args) throws Exception {
+ final File rocksDir = new File("/tmp/rdb");
+ FileUtils.deleteDirectory(rocksDir);
+
+ final Options options = new Options()
+ .setCompactionStyle(CompactionStyle.LEVEL)
+ .setLevelCompactionDynamicLevelBytes(true)
+ .setIncreaseParallelism(4)
+ .setUseFsync(false)
+ .setMaxOpenFiles(-1)
+ .setDisableDataSync(true)
+ .setCreateIfMissing(true)
+ .setMergeOperator(new StringAppendOperator());
+
+ final WriteOptions write_options = new WriteOptions()
+ .setSync(false)
+ .setDisableWAL(true);
+
+ final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath());
+
+ final String key = "key";
+ final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+ final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4);
+
+ final Unsafe unsafe = MemoryUtils.UNSAFE;
+ final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+ final int num = 50000;
+ System.out.println("begin insert");
+
+ final long beginInsert = System.nanoTime();
+ for (int i = 0; i < num; i++) {
+ unsafe.putInt(keyTemplate, offset, i);
+ rocksDB.put(write_options, keyTemplate, valueBytes);
+ }
+ final long endInsert = System.nanoTime();
+ System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms");
+
+ final byte[] resultHolder = new byte[num * valueBytes.length];
+
+ final long beginGet = System.nanoTime();
+
+ final RocksIterator iterator = rocksDB.newIterator();
+ int pos = 0;
+
+ // seek to start
+ unsafe.putInt(keyTemplate, offset, 0);
+ iterator.seek(keyTemplate);
+
+ // mark end
+ unsafe.putInt(keyTemplate, offset, -1);
+
+ // iterate
+ while (iterator.isValid()) {
+ byte[] currKey = iterator.key();
+ if (samePrefix(keyBytes, currKey)) {
+ byte[] currValue = iterator.value();
+ System.arraycopy(currValue, 0, resultHolder, pos, currValue.length);
+ pos += currValue.length;
+ iterator.next();
+ }
+ else {
+ break;
+ }
+ }
+
+ final long endGet = System.nanoTime();
+
+ System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms");
+ }
+
+ private static boolean samePrefix(byte[] prefix, byte[] key) {
+ for (int i = 0; i < prefix.length; i++) {
+ if (prefix[i] != key [i]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
[3/8] flink git commit: [FLINK-5692] [core] Add an Option to
Deactivate Kryo Fallback for Serializers
Posted by se...@apache.org.
[FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers
This closes #3373
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f99aae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f99aae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f99aae1
Branch: refs/heads/master
Commit: 0f99aae1e1f8b693c2ba79a061046bc042113f0b
Parents: 677b508
Author: Jin Mingjian <ji...@gmail.com>
Authored: Tue Feb 21 11:57:21 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
docs/dev/types_serialization.md | 7 +++++
.../flink/api/common/ExecutionConfig.java | 29 ++++++++++++++++++++
.../api/java/typeutils/GenericTypeInfo.java | 6 ++++
.../flink/api/common/ExecutionConfigTest.java | 25 +++++++++++++++++
.../graph/StreamingJobGraphGeneratorTest.java | 8 +++++-
5 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index e723c33..20ee071 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,6 +306,13 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
There are different variants of these methods available.
+If you do not want to fall back to Kryo and further make sure that you have provided your own custom serializers for all POJOs explicitly, set
+{% highlight java %}
+env.getConfig().disableGenericTypes();
+{% endhighlight %}
+
+If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink tries to fall back to the default Kryo serializer logic in the runtime.
+
## Defining Type Information using a Factory
A type information factory allows for plugging-in user-defined type information into the Flink type system.
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 32ea0a3..3bd91c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private boolean forceKryo = false;
+ private boolean disableGenericTypes = false;
+
private boolean objectReuse = false;
private boolean autoTypeRegistrationEnabled = true;
@@ -519,6 +521,31 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
}
/**
+ * Enable generic types.
+ *
+ * @see ExecutionConfig#disableGenericTypes()
+ */
+ public void enableGenericTypes() {
+ disableGenericTypes = false;
+ }
+
+ /**
+ * Disable generic types to make sure that you have provided your own custom serializers for
+ * all POJOs explicitly.
+ *
+ * If generic types disabled,
+ * an {@link UnsupportedOperationException} will be thrown when Flink
+ * tries to fall back to the default Kryo serializer logic in the runtime.
+ */
+ public void disableGenericTypes() {
+ disableGenericTypes = true;
+ }
+
+ public boolean hasGenericTypesDisabled() {
+ return disableGenericTypes;
+ }
+
+ /**
* Force Flink to use the AvroSerializer for POJOs.
*/
public void enableForceAvro() {
@@ -804,6 +831,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
((restartStrategyConfiguration == null && other.restartStrategyConfiguration == null) ||
(null != restartStrategyConfiguration && restartStrategyConfiguration.equals(other.restartStrategyConfiguration))) &&
forceKryo == other.forceKryo &&
+ disableGenericTypes == other.disableGenericTypes &&
objectReuse == other.objectReuse &&
autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled &&
forceAvro == other.forceAvro &&
@@ -830,6 +858,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
parallelism,
restartStrategyConfiguration,
forceKryo,
+ disableGenericTypes,
objectReuse,
autoTypeRegistrationEnabled,
forceAvro,
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index bc4e87a..a4cea31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -81,6 +81,12 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@Override
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ if (config.hasGenericTypesDisabled()) {
+ throw new UnsupportedOperationException(
+ "Generic types are disabled for POJOs serialization, but type " + this.typeClass +
+ " is treated as a generic type.");
+ }
+
return new KryoSerializer<T>(this.typeClass, config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 883ee6c..4956a9a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,6 +18,10 @@
package org.apache.flink.api.common;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.junit.Test;
import java.util.Arrays;
@@ -25,6 +29,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class ExecutionConfigTest {
@@ -64,4 +69,24 @@ public class ExecutionConfigTest {
assertEquals(parallelism, config.getParallelism());
}
+ @Test
+ public void testForceCustomSerializerCheck() {
+ ExecutionConfig conf = new ExecutionConfig();
+ TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class);
+ TypeSerializer<Object> serializer = typeInfo.createSerializer(conf);
+ assertTrue(serializer instanceof KryoSerializer);
+
+ conf.disableGenericTypes();
+ boolean createSerializerFailed = false;
+ try {
+ typeInfo.createSerializer(conf);
+ } catch (UnsupportedOperationException e) {
+ createSerializerFailed = true;
+ } catch (Throwable t) {
+ fail("Unexpected exception thrown: " + t.getMessage());
+ }
+
+ assertTrue(createSerializerFailed);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7c51bc2..968b1c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -55,7 +55,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamGraph streamingJob = new StreamGraph(env);
StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
- boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
+ boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
int dop = 1 + r.nextInt(10);
ExecutionConfig config = streamingJob.getExecutionConfig();
@@ -74,6 +74,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
} else {
config.disableForceKryo();
}
+ if(disableGenericTypes) {
+ config.disableGenericTypes();
+ } else {
+ config.enableGenericTypes();
+ }
if(objectReuseEnabled) {
config.enableObjectReuse();
} else {
@@ -106,6 +111,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
+ assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
assertEquals(dop, executionConfig.getParallelism());
[2/8] flink git commit: [FLINK-5692] [core] (followups) Add an Option
to Deactivate Kryo Fallback for Serializers
Posted by se...@apache.org.
[FLINK-5692] [core] (followups) Add an Option to Deactivate Kryo Fallback for Serializers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d498cbed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d498cbed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d498cbed
Branch: refs/heads/master
Commit: d498cbedfda7c5ebabbc5f8203d7518926ede423
Parents: 0f99aae
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 15:18:49 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
docs/dev/types_serialization.md | 10 ++++--
.../flink/api/common/ExecutionConfig.java | 35 ++++++++++++++++----
.../api/java/typeutils/GenericTypeInfo.java | 2 +-
.../flink/api/common/ExecutionConfigTest.java | 17 +++++-----
4 files changed, 45 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index 20ee071..053a839 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,12 +306,18 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
There are different variants of these methods available.
-If you do not want to fall back to Kryo and further make sure that you have provided your own custom serializers for all POJOs explicitly, set
+
+## Disabling Kryo Fallback
+
+There are cases when programs may want to explicitly avoid using Kryo as a fallback for generic types. The most
+common one is wanting to ensure that all types are efficiently serialized either through Flink's own serializers,
+or via user-defined custom serializers.
+
+The setting below will raise an exception whenever a data type is encountered that would go through Kryo:
{% highlight java %}
env.getConfig().disableGenericTypes();
{% endhighlight %}
-If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink tries to fall back to the default Kryo serializer logic in the runtime.
## Defining Type Information using a Factory
http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3bd91c7..26e6af1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private boolean forceKryo = false;
+ /** Flag to indicate whether generic types (through Kryo) are supported */
private boolean disableGenericTypes = false;
private boolean objectReuse = false;
@@ -521,26 +522,46 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
}
/**
- * Enable generic types.
+ * Enables the use generic types which are serialized via Kryo.
+ *
+ * <p>Generic types are enabled by default.
*
- * @see ExecutionConfig#disableGenericTypes()
+ * @see #disableGenericTypes()
*/
public void enableGenericTypes() {
disableGenericTypes = false;
}
/**
- * Disable generic types to make sure that you have provided your own custom serializers for
- * all POJOs explicitly.
+ * Disables the use of generic types (types that would be serialized via Kryo). If this option
+ * is used, Flink will throw an {@code UnsupportedOperationException} whenever it encounters
+ * a data type that would go through Kryo for serialization.
*
- * If generic types disabled,
- * an {@link UnsupportedOperationException} will be thrown when Flink
- * tries to fall back to the default Kryo serializer logic in the runtime.
+ * <p>Disabling generic types can be helpful to eagerly find and eliminate teh use of types
+ * that would go through Kryo serialization during runtime. Rather than checking types
+ * individually, using this option will throw exceptions eagerly in the places where generic
+ * types are used.
+ *
+ * <p><b>Important:</b> We recommend to use this option only during development and pre-production
+ * phases, not during actual production use. The application program and/or the input data may be
+ * such that new, previously unseen, types occur at some point. In that case, setting this option
+ * would cause the program to fail.
+ *
+ * @see #enableGenericTypes()
*/
public void disableGenericTypes() {
disableGenericTypes = true;
}
+ /**
+ * Checks whether generic types are supported. Generic types are types that go through Kryo during
+ * serialization.
+ *
+ * <p>Generic types are enabled by default.
+ *
+ * @see #enableGenericTypes()
+ * @see #disableGenericTypes()
+ */
public boolean hasGenericTypesDisabled() {
return disableGenericTypes;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index a4cea31..136b7d7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -83,7 +83,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if (config.hasGenericTypesDisabled()) {
throw new UnsupportedOperationException(
- "Generic types are disabled for POJOs serialization, but type " + this.typeClass +
+ "Generic types have been disabled in the ExecutionConfig and type " + this.typeClass.getName() +
" is treated as a generic type.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d498cbed/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 4956a9a..d000ff9 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -70,23 +70,22 @@ public class ExecutionConfigTest {
}
@Test
- public void testForceCustomSerializerCheck() {
+ public void testDisableGenericTypes() {
ExecutionConfig conf = new ExecutionConfig();
TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class);
+
+ // by default, generic types are supported
TypeSerializer<Object> serializer = typeInfo.createSerializer(conf);
assertTrue(serializer instanceof KryoSerializer);
+ // expect an exception when generic types are disabled
conf.disableGenericTypes();
- boolean createSerializerFailed = false;
try {
typeInfo.createSerializer(conf);
- } catch (UnsupportedOperationException e) {
- createSerializerFailed = true;
- } catch (Throwable t) {
- fail("Unexpected exception thrown: " + t.getMessage());
+ fail("should have failed with an exception");
+ }
+ catch (UnsupportedOperationException e) {
+ // expected
}
-
- assertTrue(createSerializerFailed);
}
-
}
[4/8] flink git commit: [hotfix] [core] Fix/cleanup serialization
test for ExecutionConfig
Posted by se...@apache.org.
[hotfix] [core] Fix/cleanup serialization test for ExecutionConfig
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afd36f98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afd36f98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afd36f98
Branch: refs/heads/master
Commit: afd36f9814ee282df8e3a58e846911f6efa54c61
Parents: d498cbe
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 15:25:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfigTest.java | 76 ++++++++++++++++++-
.../graph/StreamingJobGraphGeneratorTest.java | 79 +-------------------
2 files changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index d000ff9..7e98604 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -22,12 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
+
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -39,17 +45,17 @@ public class ExecutionConfigTest {
List<Class<?>> types = Arrays.<Class<?>>asList(Double.class, Integer.class, Double.class);
List<Class<?>> expectedTypes = Arrays.<Class<?>>asList(Double.class, Integer.class);
- for(Class<?> tpe: types) {
+ for (Class<?> tpe: types) {
config.registerKryoType(tpe);
}
int counter = 0;
- for(Class<?> tpe: config.getRegisteredKryoTypes()){
+ for (Class<?> tpe: config.getRegisteredKryoTypes()){
assertEquals(tpe, expectedTypes.get(counter++));
}
- assertTrue(counter == expectedTypes.size());
+ assertEquals(expectedTypes.size(), counter);
}
@Test
@@ -88,4 +94,68 @@ public class ExecutionConfigTest {
// expected
}
}
+
+ @Test
+ public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
+ final Random r = new Random();
+
+ final int parallelism = 1 + r.nextInt(10);
+ final boolean closureCleanerEnabled = r.nextBoolean(),
+ forceAvroEnabled = r.nextBoolean(),
+ forceKryoEnabled = r.nextBoolean(),
+ disableGenericTypes = r.nextBoolean(),
+ objectReuseEnabled = r.nextBoolean(),
+ sysoutLoggingEnabled = r.nextBoolean();
+
+ final ExecutionConfig config = new ExecutionConfig();
+
+ if (closureCleanerEnabled) {
+ config.enableClosureCleaner();
+ } else {
+ config.disableClosureCleaner();
+ }
+ if (forceAvroEnabled) {
+ config.enableForceAvro();
+ } else {
+ config.disableForceAvro();
+ }
+ if (forceKryoEnabled) {
+ config.enableForceKryo();
+ } else {
+ config.disableForceKryo();
+ }
+ if (disableGenericTypes) {
+ config.disableGenericTypes();
+ } else {
+ config.enableGenericTypes();
+ }
+ if (objectReuseEnabled) {
+ config.enableObjectReuse();
+ } else {
+ config.disableObjectReuse();
+ }
+ if (sysoutLoggingEnabled) {
+ config.enableSysoutLogging();
+ } else {
+ config.disableSysoutLogging();
+ }
+ config.setParallelism(parallelism);
+
+ final ExecutionConfig copy1 = CommonTestUtils.createCopySerializable(config);
+ final ExecutionConfig copy2 = new SerializedValue<>(config).deserializeValue(getClass().getClassLoader());
+
+ assertNotNull(copy1);
+ assertNotNull(copy2);
+
+ assertEquals(config, copy1);
+ assertEquals(config, copy2);
+
+ assertEquals(closureCleanerEnabled, copy1.isClosureCleanerEnabled());
+ assertEquals(forceAvroEnabled, copy1.isForceAvroEnabled());
+ assertEquals(forceKryoEnabled, copy1.isForceKryoEnabled());
+ assertEquals(disableGenericTypes, copy1.hasGenericTypesDisabled());
+ assertEquals(objectReuseEnabled, copy1.isObjectReuseEnabled());
+ assertEquals(sysoutLoggingEnabled, copy1.isSysoutLoggingEnabled());
+ assertEquals(parallelism, copy1.getParallelism());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 968b1c9..5f1973c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.graph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -27,96 +26,20 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class StreamingJobGraphGeneratorTest extends TestLogger {
-
- @Test
- public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
- final long seed = System.currentTimeMillis();
- final Random r = new Random(seed);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- StreamGraph streamingJob = new StreamGraph(env);
- StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
-
- boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
- int dop = 1 + r.nextInt(10);
-
- ExecutionConfig config = streamingJob.getExecutionConfig();
- if(closureCleanerEnabled) {
- config.enableClosureCleaner();
- } else {
- config.disableClosureCleaner();
- }
- if(forceAvroEnabled) {
- config.enableForceAvro();
- } else {
- config.disableForceAvro();
- }
- if(forceKryoEnabled) {
- config.enableForceKryo();
- } else {
- config.disableForceKryo();
- }
- if(disableGenericTypes) {
- config.disableGenericTypes();
- } else {
- config.enableGenericTypes();
- }
- if(objectReuseEnabled) {
- config.enableObjectReuse();
- } else {
- config.disableObjectReuse();
- }
- if(sysoutLoggingEnabled) {
- config.enableSysoutLogging();
- } else {
- config.disableSysoutLogging();
- }
- config.setParallelism(dop);
-
- JobGraph jobGraph = compiler.createJobGraph();
-
- final String EXEC_CONFIG_KEY = "runtime.config";
-
- InstantiationUtil.writeObjectToConfig(jobGraph.getSerializedExecutionConfig(),
- jobGraph.getJobConfiguration(),
- EXEC_CONFIG_KEY);
-
- SerializedValue<ExecutionConfig> serializedExecutionConfig = InstantiationUtil.readObjectFromConfig(
- jobGraph.getJobConfiguration(),
- EXEC_CONFIG_KEY,
- Thread.currentThread().getContextClassLoader());
- assertNotNull(serializedExecutionConfig);
-
- ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(getClass().getClassLoader());
-
- assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
- assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
- assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
- assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
- assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
- assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
- assertEquals(dop, executionConfig.getParallelism());
- }
-
@Test
public void testParallelismOneNotChained() {