You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/24 21:12:50 UTC

[1/6] flink git commit: [FLINK-4453] [docs] Scala code example in Window documentation shows Java

Repository: flink
Updated Branches:
  refs/heads/master 58850f292 -> addad1af4


[FLINK-4453] [docs] Scala code example in Window documentation shows Java

This closes #2411


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42f65e4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42f65e4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42f65e4b

Branch: refs/heads/master
Commit: 42f65e4b93ef7f71b6252bc9c664bee727fd4278
Parents: 58850f2
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Wed Aug 24 11:04:25 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 19:27:28 2016 +0200

----------------------------------------------------------------------
 docs/dev/windows.md | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42f65e4b/docs/dev/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/windows.md b/docs/dev/windows.md
index 084a2ee..63505c3 100644
--- a/docs/dev/windows.md
+++ b/docs/dev/windows.md
@@ -409,19 +409,18 @@ public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
+trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
 
   /**
-   * Evaluates the window and outputs none or several elements.
-   *
-   * @param key The key for which this window is evaluated.
-   * @param window The window that is being evaluated.
-   * @param input The elements in the window being evaluated.
-   * @param out A collector for emitting elements.
-   *
-   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
-   */
-  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
+    * Evaluates the window and outputs none or several elements.
+    *
+    * @param key    The key for which this window is evaluated.
+    * @param window The window that is being evaluated.
+    * @param input  The elements in the window being evaluated.
+    * @param out    A collector for emitting elements.
+    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+    */
+  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
 }
 {% endhighlight %}
 </div>


[5/6] flink git commit: [FLINK-4457] Make ExecutionGraph independent of actors.

Posted by se...@apache.org.
[FLINK-4457] Make ExecutionGraph independent of actors.

This introduced types JobStatusListener and ExecutionStatusListener interfaces
that replace the ActorRefs and ActorGateway for listeners


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/635c8693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/635c8693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/635c8693

Branch: refs/heads/master
Commit: 635c869326cc77e4199e4d8ee597aed69ed16cd2
Parents: 4e9d177
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 19:12:07 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 21:19:04 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 34 +++-----
 .../CheckpointCoordinatorDeActivator.java       | 45 ++++-------
 .../runtime/executiongraph/ExecutionGraph.java  | 81 ++++++++++----------
 .../executiongraph/ExecutionStatusListener.java | 54 +++++++++++++
 .../executiongraph/JobStatusListener.java       | 39 ++++++++++
 .../executiongraph/StatusListenerMessenger.java | 70 +++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 12 ++-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  3 -
 .../LeaderChangeJobRecoveryTest.java            | 73 ++++--------------
 .../flink/core/testutils/OneShotLatch.java      | 55 +++++++++++++
 .../flink/core/testutils/OneShotLatchTest.java  | 55 +++++++++++++
 11 files changed, 360 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b710324..3619f48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
 import akka.dispatch.Futures;
 
 import org.apache.flink.api.common.JobID;
@@ -31,8 +28,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -57,7 +53,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -137,7 +132,7 @@ public class CheckpointCoordinator {
 	private final Timer timer;
 
 	/** Actor that receives status updates from the execution graph this coordinator works for */
-	private ActorGateway jobStatusListener;
+	private JobStatusListener jobStatusListener;
 
 	/** The number of consecutive failed trigger attempts */
 	private int numUnsuccessfulCheckpointsTriggers;
@@ -266,12 +261,6 @@ public class CheckpointCoordinator {
 				// shut down the thread that handles the timeouts and pending triggers
 				timer.cancel();
 
-				// make sure that the actor does not linger
-				if (jobStatusListener != null) {
-					jobStatusListener.tell(PoisonPill.getInstance());
-					jobStatusListener = null;
-				}
-
 				// clear and discard all pending checkpoints
 				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
 					pending.abortError(new Exception("Checkpoint Coordinator is shutting down"));
@@ -903,7 +892,7 @@ public class CheckpointCoordinator {
 	//  Periodic scheduling of checkpoints
 	// --------------------------------------------------------------------------------------------
 
-	public void startCheckpointScheduler() throws Exception {
+	public void startCheckpointScheduler() {
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
@@ -918,7 +907,7 @@ public class CheckpointCoordinator {
 		}
 	}
 
-	public void stopCheckpointScheduler() throws Exception {
+	public void stopCheckpointScheduler() {
 		synchronized (lock) {
 			triggerRequestQueued = false;
 			periodicScheduling = false;
@@ -929,10 +918,14 @@ public class CheckpointCoordinator {
 			}
 
 			for (PendingCheckpoint p : pendingCheckpoints.values()) {
-				p.abortError(new Exception("Checkpoint Coordinator is suspending."));
+				try {
+					p.abortError(new Exception("Checkpoint Coordinator is suspending."));
+				} catch (Throwable t) {
+					LOG.error("Error while disposing pending checkpoint", t);
+				}
 			}
-			pendingCheckpoints.clear();
 
+			pendingCheckpoints.clear();
 			numUnsuccessfulCheckpointsTriggers = 0;
 		}
 	}
@@ -941,17 +934,14 @@ public class CheckpointCoordinator {
 	//  job status listener that schedules / cancels periodic checkpoints
 	// ------------------------------------------------------------------------
 
-	public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
+	public JobStatusListener createActivatorDeactivator() {
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 			}
 
 			if (jobStatusListener == null) {
-				Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
-
-				// wrap the ActorRef in a AkkaActorGateway to support message decoration
-				jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
+				jobStatusListener = new CheckpointCoordinatorDeActivator(this);
 			}
 
 			return jobStatusListener;

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 7e26f71..2e23d6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -18,51 +18,32 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This actor listens to changes in the JobStatus and activates or deactivates the periodic
  * checkpoint scheduler.
  */
-public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
+public class CheckpointCoordinatorDeActivator implements JobStatusListener {
 
 	private final CheckpointCoordinator coordinator;
-	private final UUID leaderSessionID;
-	
-	public CheckpointCoordinatorDeActivator(
-			CheckpointCoordinator coordinator,
-			UUID leaderSessionID) {
 
-		LOG.info("Create CheckpointCoordinatorDeActivator");
-
-		this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
-		this.leaderSessionID = leaderSessionID;
+	public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
+		this.coordinator = checkNotNull(coordinator);
 	}
 
 	@Override
-	public void handleMessage(Object message) throws Exception {
-		if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
-			JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus();
-			
-			if (status == JobStatus.RUNNING) {
-				// start the checkpoint scheduler
-				coordinator.startCheckpointScheduler();
-			} else {
-				// anything else should stop the trigger for now
-				coordinator.stopCheckpointScheduler();
-			}
+	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+		if (newJobStatus == JobStatus.RUNNING) {
+			// start the checkpoint scheduler
+			coordinator.startCheckpointScheduler();
+		} else {
+			// anything else should stop the trigger for now
+			coordinator.stopCheckpointScheduler();
 		}
-		
-		// we ignore all other messages
-	}
-
-	@Override
-	public UUID getLeaderSessionID() {
-		return leaderSessionID;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 12d8e66..7a94c0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.ActorSystem;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -50,15 +48,16 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -75,12 +74,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The execution graph is the central data structure that coordinates the distributed
  * execution of a data flow. It keeps representations of each parallel task, each
@@ -151,12 +150,12 @@ public class ExecutionGraph {
 	 * accessible on all nodes in the cluster. */
 	private final List<URL> requiredClasspaths;
 
-	/** Listeners that receive messages when the entire job switches it status (such as from
-	 * RUNNING to FINISHED) */
-	private final List<ActorGateway> jobStatusListenerActors;
+	/** Listeners that receive messages when the entire job switches it status
+	 * (such as from RUNNING to FINISHED) */
+	private final List<JobStatusListener> jobStatusListeners;
 
 	/** Listeners that receive messages whenever a single task execution changes its status */
-	private final List<ActorGateway> executionListenerActors;
+	private final List<ExecutionStatusListener> executionListeners;
 
 	/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
 	 * the execution graph transitioned into a certain state. The index into this array is the
@@ -284,8 +283,8 @@ public class ExecutionGraph {
 		this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
 		this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
 
-		this.jobStatusListenerActors  = new CopyOnWriteArrayList<ActorGateway>();
-		this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
+		this.jobStatusListeners  = new CopyOnWriteArrayList<>();
+		this.executionListeners = new CopyOnWriteArrayList<>();
 
 		this.stateTimestamps = new long[JobStatus.values().length];
 		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
@@ -345,8 +344,6 @@ public class ExecutionGraph {
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
-			ActorSystem actorSystem,
-			UUID leaderSessionID,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore checkpointStore,
 			SavepointStore savepointStore,
@@ -388,8 +385,7 @@ public class ExecutionGraph {
 
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
 		// job status changes (running -> on, all other states -> off)
-		registerJobStatusListener(
-				checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
+		registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
 	}
 
 	/**
@@ -935,8 +931,8 @@ public class ExecutionGraph {
 		intermediateResults.clear();
 		currentExecutions.clear();
 		requiredJarFiles.clear();
-		jobStatusListenerActors.clear();
-		executionListenerActors.clear();
+		jobStatusListeners.clear();
+		executionListeners.clear();
 
 		isArchived = true;
 	}
@@ -1173,45 +1169,52 @@ public class ExecutionGraph {
 	//  Listeners & Observers
 	// --------------------------------------------------------------------------------------------
 
-	public void registerJobStatusListener(ActorGateway listener) {
+	public void registerJobStatusListener(JobStatusListener listener) {
 		if (listener != null) {
-			this.jobStatusListenerActors.add(listener);
+			jobStatusListeners.add(listener);
 		}
 	}
 
-	public void registerExecutionListener(ActorGateway listener) {
+	public void registerExecutionListener(ExecutionStatusListener listener) {
 		if (listener != null) {
-			this.executionListenerActors.add(listener);
+			executionListeners.add(listener);
 		}
 	}
 
 	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
-		if (jobStatusListenerActors.size() > 0) {
-			ExecutionGraphMessages.JobStatusChanged message =
-					new ExecutionGraphMessages.JobStatusChanged(jobID, newState, System.currentTimeMillis(),
-							error == null ? null : new SerializedThrowable(error));
-
-			for (ActorGateway listener: jobStatusListenerActors) {
-				listener.tell(message);
+		if (jobStatusListeners.size() > 0) {
+			final long timestamp = System.currentTimeMillis();
+			final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
+
+			for (JobStatusListener listener : jobStatusListeners) {
+				try {
+					listener.jobStatusChanges(jobID, newState, timestamp, serializedError);
+				} catch (Throwable t) {
+					LOG.warn("Error while notifying JobStatusListener", t);
+				}
 			}
 		}
 	}
 
-	void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
-							newExecutionState, Throwable error)
+	void notifyExecutionChange(
+			JobVertexID vertexId, int subtask, ExecutionAttemptID executionID,
+			ExecutionState newExecutionState, Throwable error)
 	{
 		ExecutionJobVertex vertex = getJobVertex(vertexId);
 
-		if (executionListenerActors.size() > 0) {
-			String message = error == null ? null : ExceptionUtils.stringifyException(error);
-			ExecutionGraphMessages.ExecutionStateChanged actorMessage =
-					new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId,  vertex.getJobVertex().getName(),
-																	vertex.getParallelism(), subtask,
-																	executionID, newExecutionState,
-																	System.currentTimeMillis(), message);
-
-			for (ActorGateway listener : executionListenerActors) {
-				listener.tell(actorMessage);
+		if (executionListeners.size() > 0) {
+			final String message = error == null ? null : ExceptionUtils.stringifyException(error);
+			final long timestamp = System.currentTimeMillis();
+
+			for (ExecutionStatusListener listener : executionListeners) {
+				try {
+					listener.executionStatusChanged(
+							jobID, vertexId, vertex.getJobVertex().getName(),
+							vertex.getParallelism(), subtask, executionID, newExecutionState,
+							timestamp, message);
+				} catch (Throwable t) {
+					LOG.warn("Error while notifying ExecutionStatusListener", t);
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
new file mode 100644
index 0000000..6fb5a1a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Interface for observers that monitor the status of individual task executions.
+ */
+public interface ExecutionStatusListener {
+
+	/**
+	 * Called whenever the execution status of a task changes.
+	 * 
+	 * @param jobID                  The ID of the job
+	 * @param vertexID               The ID of the task vertex
+	 * @param taskName               The name of the task
+	 * @param totalNumberOfSubTasks  The parallelism of the task
+	 * @param subtaskIndex           The subtask's parallel index
+	 * @param executionID            The ID of the execution attempt
+	 * @param newExecutionState      The status to which the task switched
+	 * @param timestamp              The timestamp when the change occurred. Informational only.
+	 * @param optionalMessage        An optional message attached to the status change, like an
+	 *                               exception message.
+	 */
+	void executionStatusChanged(
+			JobID jobID,
+			JobVertexID vertexID,
+			String taskName,
+			int totalNumberOfSubTasks,
+			int subtaskIndex,
+			ExecutionAttemptID executionID,
+			ExecutionState newExecutionState,
+			long timestamp,
+			String optionalMessage);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
new file mode 100644
index 0000000..1d97a5c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobStatusListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+/**
+ * Interface for observers that monitor the status of a job.
+ */
+public interface JobStatusListener {
+
+	/**
+	 * This method is called whenever the status of the job changes.
+	 * 
+	 * @param jobId         The ID of the job.
+	 * @param newJobStatus  The status the job switched to.
+	 * @param timestamp     The timestamp when the status transition occurred.
+	 * @param error         In case the job status switches to a failure state, this is the
+	 *                      exception that caused the failure.
+	 */
+	void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
new file mode 100644
index 0000000..01f1e75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.ExecutionGraphMessages;
+import org.apache.flink.runtime.util.SerializedThrowable;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code JobStatusListener} and {@code ExecutionStatusListener} that sends an actor message
+ * for each status change.
+ */
+public class StatusListenerMessenger implements JobStatusListener, ExecutionStatusListener {
+
+	private final AkkaActorGateway target;
+
+	public StatusListenerMessenger(ActorRef target, UUID leaderSessionId) {
+		this.target = new AkkaActorGateway(checkNotNull(target), leaderSessionId);
+	}
+
+	@Override
+	public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+		ExecutionGraphMessages.JobStatusChanged message =
+				new ExecutionGraphMessages.JobStatusChanged(jobId, newJobStatus, timestamp,
+						error == null ? null : new SerializedThrowable(error));
+
+		target.tell(message);
+	}
+
+	@Override
+	public void executionStatusChanged(
+			JobID jobID, JobVertexID vertexID,
+			String taskName, int taskParallelism, int subtaskIndex,
+			ExecutionAttemptID executionID, ExecutionState newExecutionState,
+			long timestamp, String optionalMessage) {
+		
+		ExecutionGraphMessages.ExecutionStateChanged message = 
+				new ExecutionGraphMessages.ExecutionStateChanged(
+					jobID, vertexID, taskName, taskParallelism, subtaskIndex,
+					executionID, newExecutionState, timestamp, optionalMessage);
+
+		target.tell(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 34fed3f..0587987 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -51,7 +51,7 @@ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
+import org.apache.flink.runtime.executiongraph.{StatusListenerMessenger, ExecutionGraph, ExecutionJobVertex}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
@@ -1249,8 +1249,6 @@ class JobManager(
             triggerVertices,
             ackVertices,
             confirmVertices,
-            context.system,
-            leaderSessionID.orNull,
             checkpointIdCounter,
             completedCheckpoints,
             savepointStore,
@@ -1259,14 +1257,14 @@ class JobManager(
 
         // get notified about job status changes
         executionGraph.registerJobStatusListener(
-          new AkkaActorGateway(self, leaderSessionID.orNull))
+          new StatusListenerMessenger(self, leaderSessionID.orNull))
 
         if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
           // the sender wants to be notified about state changes
-          val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)
+          val listener  = new StatusListenerMessenger(jobInfo.client, leaderSessionID.orNull)
 
-          executionGraph.registerExecutionListener(gateway)
-          executionGraph.registerJobStatusListener(gateway)
+          executionGraph.registerExecutionListener(listener)
+          executionGraph.registerJobStatusListener(listener)
         }
       } catch {
         case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 7b05fd7..49a9449 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -40,7 +40,6 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.net.URL;
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.mock;
@@ -117,8 +116,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
-				system,
-				UUID.randomUUID(),
 				counter,
 				store,
 				new HeapSavepointStore(),

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 57de2cd..450f9fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -18,28 +18,28 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
-import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
@@ -113,15 +113,12 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 
 		ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
 
-		TestActorGateway testActorGateway = new TestActorGateway();
-
-		executionGraph.registerJobStatusListener(testActorGateway);
+		TestJobStatusListener testListener = new TestJobStatusListener();
+		executionGraph.registerJobStatusListener(testListener);
 
 		cluster.revokeLeadership();
 
-		Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState();
-
-		assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout));
+		testListener.waitForTerminalState(30000);
 	}
 
 	public JobGraph createBlockingJob(int parallelism) {
@@ -150,59 +147,19 @@ public class LeaderChangeJobRecoveryTest extends TestLogger {
 		return jobGraph;
 	}
 
-	public static class TestActorGateway implements ActorGateway {
-
-		private static final long serialVersionUID = -736146686160538227L;
-		private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-		public Future<Boolean> hasReachedTerminalState() {
-			return terminalState.future();
-		}
+	public static class TestJobStatusListener implements JobStatusListener {
 
-		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			return null;
-		}
+		private final OneShotLatch terminalStateLatch = new OneShotLatch();
 
-		@Override
-		public void tell(Object message) {
-			this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null));
+		public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException {
+			terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
 		}
 
 		@Override
-		public void tell(Object message, ActorGateway sender) {
-			if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
-				ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
-
-				if (jobStatusChanged.newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus() == JobStatus.SUSPENDED) {
-					terminalState.success(true);
-				}
+		public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
+			if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) {
+				terminalStateLatch.trigger();
 			}
 		}
-
-		@Override
-		public void forward(Object message, ActorGateway sender) {
-
-		}
-
-		@Override
-		public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
-			return null;
-		}
-
-		@Override
-		public String path() {
-			return null;
-		}
-
-		@Override
-		public ActorRef actor() {
-			return null;
-		}
-
-		@Override
-		public UUID leaderSessionID() {
-			return null;
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
index 54ac110..0418bf5 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/OneShotLatch.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.core.testutils;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 /**
  * Latch for synchronizing parts of code in tests. Once the latch has fired once calls to
  * {@link #await()} will return immediately in the future.
@@ -44,6 +47,8 @@ public final class OneShotLatch {
 	/**
 	 * Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
 	 * call will always return immediately.
+	 * 
+	 * @throws InterruptedException Thrown if the thread is interrupted while waiting.
 	 */
 	public void await() throws InterruptedException {
 		synchronized (lock) {
@@ -52,4 +57,54 @@ public final class OneShotLatch {
 			}
 		}
 	}
+
+	/**
+	 * Waits until {@link #trigger())} is called. Once {@code #trigger()} has been called this
+	 * call will always return immediately.
+	 * 
+	 * <p>If the latch is not triggered within the given timeout, a {@code TimeoutException}
+	 * will be thrown after the timeout.
+	 * 
+	 * <p>A timeout value of zero means infinite timeout and make this equivalent to {@link #await()}.
+	 * 
+	 * @param timeout   The value of the timeout, a value of zero indicating infinite timeout.
+	 * @param timeUnit  The unit of the timeout
+	 * 
+	 * @throws InterruptedException Thrown if the thread is interrupted while waiting.
+	 * @throws TimeoutException Thrown, if the latch is not triggered within the timeout time.
+	 */
+	public void await(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+		if (timeout < 0) {
+			throw new IllegalArgumentException("time may not be negative");
+		}
+		if (timeUnit == null) {
+			throw new NullPointerException("timeUnit");
+		}
+
+		if (timeout == 0) {
+			await();
+		} else {
+			final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
+			long millisToWait;
+
+			synchronized (lock) {
+				while (!triggered && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
+					lock.wait(millisToWait);
+				}
+
+				if (!triggered) {
+					throw new TimeoutException();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Checks if the latch was triggered.
+	 * 
+	 * @return True, if the latch was triggered, false if not.
+	 */
+	public boolean isTriggered() {
+		return triggered;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/635c8693/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
new file mode 100644
index 0000000..575c84c
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/core/testutils/OneShotLatchTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class OneShotLatchTest {
+
+	@Test
+	public void testAwaitWithTimeout() throws Exception {
+		OneShotLatch latch = new OneShotLatch();
+		assertFalse(latch.isTriggered());
+
+		try {
+			latch.await(1, TimeUnit.MILLISECONDS);
+			fail("should fail with a TimeoutException");
+		} catch (TimeoutException e) {
+			// expected
+		}
+
+		assertFalse(latch.isTriggered());
+
+		latch.trigger();
+		assertTrue(latch.isTriggered());
+
+		latch.await(100, TimeUnit.DAYS);
+		assertTrue(latch.isTriggered());
+
+		latch.await(0, TimeUnit.MILLISECONDS);
+		assertTrue(latch.isTriggered());
+	}
+}


[4/6] flink git commit: [FLINK-4437] [checkpoints] Properly lock the triggerCheckpoint() method

Posted by se...@apache.org.
[FLINK-4437] [checkpoints] Properly lock the triggerCheckpoint() method

This introduces a trigger-lock that makes sure checkpoint trigger attemps to not overtake
each other (as may otherwise be the case for periodic checkpoints and manual savepoints).

This also fixes the evaluation of the min-delay-between-checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4da40bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4da40bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4da40bcb

Branch: refs/heads/master
Commit: 4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e
Parents: 4e45659
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 14:02:47 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 19:56:17 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 219 +++++++++++--------
 .../checkpoint/CoordinatorShutdownTest.java     |   3 +-
 2 files changed, 133 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4da40bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index ff54bad..2c0e63b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -80,6 +80,13 @@ public class CheckpointCoordinator {
 	/** Coordinator-wide lock to safeguard the checkpoint updates */
 	private final Object lock = new Object();
 
+	/** Lock specially to make sure that trigger requests do not overtake each other.
+	 * This is not done with the coordinator-wide lock, because as part of triggering,
+	 * blocking operations may happen (distributed atomic counters).
+	 * Using a dedicated lock, we avoid blocking the processing of 'acknowledge/decline'
+	 * messages during that phase. */
+	private final Object triggerLock = new Object();
+
 	/** The job whose checkpoint this coordinator coordinates */
 	private final JobID job;
 
@@ -179,6 +186,12 @@ public class CheckpointCoordinator {
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
 		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
 
+		// it does not make sense to schedule checkpoints more often then the desired
+		// time between checkpoints
+		if (baseInterval < minPauseBetweenCheckpoints) {
+			baseInterval = minPauseBetweenCheckpoints;
+		}
+
 		this.job = checkNotNull(job);
 		this.baseInterval = baseInterval;
 		this.checkpointTimeout = checkpointTimeout;
@@ -202,8 +215,8 @@ public class CheckpointCoordinator {
 			// Make sure the checkpoint ID enumerator is running. Possibly
 			// issues a blocking call to ZooKeeper.
 			checkpointIDCounter.start();
-		} catch (Exception e) {
-			throw new Exception("Failed to start checkpoint ID counter: " + e.getMessage(), e);
+		} catch (Throwable t) {
+			throw new Exception("Failed to start checkpoint ID counter: " + t.getMessage(), t);
 		}
 	}
 
@@ -335,7 +348,13 @@ public class CheckpointCoordinator {
 				}
 
 				// make sure the minimum interval between checkpoints has passed
-				if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
+				long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
+				if (nextCheckpointEarliest < 0) {
+					// overflow
+					nextCheckpointEarliest = Long.MAX_VALUE;
+				}
+
+				if (nextCheckpointEarliest > timestamp) {
 					if (currentPeriodicTrigger != null) {
 						currentPeriodicTrigger.cancel();
 						currentPeriodicTrigger = null;
@@ -343,7 +362,8 @@ public class CheckpointCoordinator {
 					ScheduledTrigger trigger = new ScheduledTrigger();
 					// Reassign the new trigger to the currentPeriodicTrigger
 					currentPeriodicTrigger = trigger;
-					timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
+					long delay = nextCheckpointEarliest - timestamp;
+					timer.scheduleAtFixedRate(trigger, delay, baseInterval);
 					return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 				}
 			}
@@ -380,105 +400,130 @@ public class CheckpointCoordinator {
 
 		// we will actually trigger this checkpoint!
 
-		lastTriggeredCheckpoint = timestamp;
-		final long checkpointID;
-		try {
-			// this must happen outside the locked scope, because it communicates
-			// with external services (in HA mode) and may block for a while.
-			checkpointID = checkpointIdCounter.getAndIncrement();
-		}
-		catch (Throwable t) {
-			int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
-			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
-			return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
-		}
+		// we lock with a special lock to make sure that trigger requests do not overtake each other.
+		// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
+		// may issue blocking operations. Using a different lock than teh coordinator-wide lock,
+		// we avoid blocking the processing of 'acknowledge/decline' messages during that time.
+		synchronized (triggerLock) {
+			final long checkpointID;
+			try {
+				// this must happen outside the coordinator-wide lock, because it communicates
+				// with external services (in HA mode) and may block for a while.
+				checkpointID = checkpointIdCounter.getAndIncrement();
+			}
+			catch (Throwable t) {
+				int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+			}
 
-		LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+			final PendingCheckpoint checkpoint = props.isSavepoint() ?
+				new PendingSavepoint(job, checkpointID, timestamp, ackTasks, userClassLoader, savepointStore) :
+				new PendingCheckpoint(job, checkpointID, timestamp, ackTasks, userClassLoader);
+
+			// schedule the timer that will clean up the expired checkpoints
+			TimerTask canceller = new TimerTask() {
+				@Override
+				public void run() {
+					try {
+						synchronized (lock) {
+							// only do the work if the checkpoint is not discarded anyways
+							// note that checkpoint completion discards the pending checkpoint object
+							if (!checkpoint.isDiscarded()) {
+								LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+	
+								checkpoint.abortExpired();
+								pendingCheckpoints.remove(checkpointID);
+								rememberRecentCheckpointId(checkpointID);
+	
+								triggerQueuedRequests();
+							}
+						}
+					}
+					catch (Throwable t) {
+						LOG.error("Exception while handling checkpoint timeout", t);
+					}
+				}
+			};
 
-		final PendingCheckpoint checkpoint = props.isSavepoint() ?
-			new PendingSavepoint(job, checkpointID, timestamp, ackTasks, userClassLoader, savepointStore) :
-			new PendingCheckpoint(job, checkpointID, timestamp, ackTasks, userClassLoader);
+			try {
+				// re-acquire the coordinator-wide lock
+				synchronized (lock) {
+					// since we released the lock in the meantime, we need to re-check
+					// that the conditions still hold.
+					if (shutdown) {
+						return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+					}
+					else if (!props.isSavepoint()) {
+						if (triggerRequestQueued) {
+							LOG.warn("Trying to trigger another checkpoint while one was queued already");
+							return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+						}
 
-		// schedule the timer that will clean up the expired checkpoints
-		TimerTask canceller = new TimerTask() {
-			@Override
-			public void run() {
-				try {
-					synchronized (lock) {
-						// only do the work if the checkpoint is not discarded anyways
-						// note that checkpoint completion discards the pending checkpoint object
-						if (!checkpoint.isDiscarded()) {
-							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+						if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+							triggerRequestQueued = true;
+							if (currentPeriodicTrigger != null) {
+								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger = null;
+							}
+							return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+						}
 
-							checkpoint.abortExpired();
-							pendingCheckpoints.remove(checkpointID);
-							rememberRecentCheckpointId(checkpointID);
+						// make sure the minimum interval between checkpoints has passed
+						long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
+						if (nextCheckpointEarliest < 0) {
+							// overflow
+							nextCheckpointEarliest = Long.MAX_VALUE;
+						}
 
-							triggerQueuedRequests();
+						if (nextCheckpointEarliest > timestamp) {
+							if (currentPeriodicTrigger != null) {
+								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger = null;
+							}
+							ScheduledTrigger trigger = new ScheduledTrigger();
+							// Reassign the new trigger to the currentPeriodicTrigger
+							currentPeriodicTrigger = trigger;
+							long delay = nextCheckpointEarliest - timestamp;
+							timer.scheduleAtFixedRate(trigger, delay, baseInterval);
+							return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 						}
 					}
-				}
-				catch (Throwable t) {
-					LOG.error("Exception while handling checkpoint timeout", t);
-				}
-			}
-		};
 
-		try {
-			// re-acquire the lock
-			synchronized (lock) {
-				// since we released the lock in the meantime, we need to re-check
-				// that the conditions still hold. this is clumsy, but it allows us to
-				// release the lock in the meantime while calls to external services are
-				// blocking progress, and still gives us early checks that skip work
-				// if no checkpoint can happen anyways
-				if (shutdown) {
-					return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+
+					lastTriggeredCheckpoint = Math.max(timestamp, lastTriggeredCheckpoint);
+					pendingCheckpoints.put(checkpointID, checkpoint);
+					timer.schedule(canceller, checkpointTimeout);
 				}
-				else if (!props.isSavepoint()) {
-					if (triggerRequestQueued) {
-						LOG.warn("Trying to trigger another checkpoint while one was queued already");
-						return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
-					}
-					else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
-						triggerRequestQueued = true;
-						if (currentPeriodicTrigger != null) {
-							currentPeriodicTrigger.cancel();
-							currentPeriodicTrigger = null;
-						}
-						return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-					}
+				// end of lock scope
+
+				// send the messages to the tasks that trigger their checkpoint
+				for (int i = 0; i < tasksToTrigger.length; i++) {
+					ExecutionAttemptID id = triggerIDs[i];
+					TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
+					tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
 				}
 
-				pendingCheckpoints.put(checkpointID, checkpoint);
-				timer.schedule(canceller, checkpointTimeout);
+				numUnsuccessfulCheckpointsTriggers = 0;
+				return new CheckpointTriggerResult(checkpoint);
 			}
-			// end of lock scope
+			catch (Throwable t) {
+				// guard the map against concurrent modifications
+				synchronized (lock) {
+					pendingCheckpoints.remove(checkpointID);
+				}
 
-			// send the messages to the tasks that trigger their checkpoint
-			for (int i = 0; i < tasksToTrigger.length; i++) {
-				ExecutionAttemptID id = triggerIDs[i];
-				TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
-				tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
-			}
+				int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
 
-			numUnsuccessfulCheckpointsTriggers = 0;
-			return new CheckpointTriggerResult(checkpoint);
-		}
-		catch (Throwable t) {
-			// guard the map against concurrent modifications
-			synchronized (lock) {
-				pendingCheckpoints.remove(checkpointID);
+				if (!checkpoint.isDiscarded()) {
+					checkpoint.abortError(new Exception("Failed to trigger checkpoint"));
+				}
+				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
-			int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
-			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
-
-			if (!checkpoint.isDiscarded()) {
-				checkpoint.abortError(new Exception("Failed to trigger checkpoint"));
-			}
-			return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
-		}
+		} // end trigger lock
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4da40bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 91a83b2..c43cf2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -29,10 +28,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.Tasks;
-
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import org.junit.Test;
 
 import scala.concurrent.Await;


[2/6] flink git commit: [hotfix] Reduce string concatenations in ExecutionVertex

Posted by se...@apache.org.
[hotfix] Reduce string concatenations in ExecutionVertex


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e45659a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e45659a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e45659a

Branch: refs/heads/master
Commit: 4e45659a5abefbfbd693e3754a19fab57b405427
Parents: 42f65e4
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 14:02:25 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 19:27:29 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/executiongraph/ExecutionVertex.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e45659a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 08bf57f..2495316 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -702,7 +702,7 @@ public class ExecutionVertex {
 	 * @return A simple name representation.
 	 */
 	public String getSimpleName() {
-		return getTaskName() + " (" + (getParallelSubtaskIndex()+1) + '/' + getTotalNumberOfParallelSubtasks() + ')';
+		return taskNameWithSubtask;
 	}
 
 	@Override


[6/6] flink git commit: [hotfix] [tests] Fix mini cluster usage and logging/printing in CustomDistributionITCase

Posted by se...@apache.org.
[hotfix] [tests] Fix mini cluster usage and logging/printing in CustomDistributionITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/addad1af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/addad1af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/addad1af

Branch: refs/heads/master
Commit: addad1af453a088c559db234370db565a35fbc11
Parents: 635c869
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 21:02:09 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 21:19:04 2016 +0200

----------------------------------------------------------------------
 .../CustomDistributionITCase.java               | 110 +++++++++++--------
 1 file changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/addad1af/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index c6bc08e..ca2c156 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -30,30 +30,60 @@ import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
-import org.junit.Test;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.io.IOException;
 
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("serial")
+public class CustomDistributionITCase extends TestLogger {
 
-public class CustomDistributionITCase {
+	// ------------------------------------------------------------------------
+	//  The mini cluster that is shared across tests
+	// ------------------------------------------------------------------------
 
-	@Test
-	public void testPartitionWithDistribution1() throws Exception{
-		/*
-		 * Test the record partitioned rightly with one field according to the customized data distribution
-		 */
+	private static ForkableFlinkMiniCluster cluster;
 
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+	@BeforeClass
+	public static void setup() throws Exception {
+		cluster = TestBaseUtils.startCluster(1, 8, false, false, true);
+	}
 
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+	@AfterClass
+	public static void teardown() throws Exception {
+		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+
+	@Before
+	public void prepare() {
+		TestEnvironment clusterEnv = new TestEnvironment(cluster, 1);
+		clusterEnv.setAsContext();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test the record partitioned rightly with one field according to the customized data distribution
+	 */
+	@Test
+	public void testPartitionWithDistribution1() throws Exception {
 		final TestDataDist1 dist = new TestDataDist1();
 
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(dist.getParallelism());
 
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
 		DataSet<Boolean> result = DataSetUtils
 			.partitionByRange(input, dist, 0)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -96,13 +126,15 @@ public class CustomDistributionITCase {
 		env.execute();
 	}
 
+	/**
+	 * Test the record partitioned rightly with two fields according to the customized data distribution
+	 */
 	@Test
-	public void testRangeWithDistribution2() throws Exception{
-		/*
-		 * Test the record partitioned rightly with two fields according to the customized data distribution
-		 */
+	public void testRangeWithDistribution2() throws Exception {
+		final TestDataDist2 dist = new TestDataDist2();
 
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(dist.getParallelism());
 
 		DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements(
 						new Tuple3<>(1, 5, "Hi"),
@@ -122,10 +154,6 @@ public class CustomDistributionITCase {
 						new Tuple3<>(5, 3, "Hi Java again")
 			);
 
-		final TestDataDist2 dist = new TestDataDist2();
-
-		env.setParallelism(dist.getParallelism());
-
 		DataSet<Boolean> result = DataSetUtils
 			.partitionByRange(input, dist, 0, 1)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
@@ -175,18 +203,18 @@ public class CustomDistributionITCase {
 		env.execute();
 	}
 
+	/*
+	 * Test the number of partition keys less than the number of distribution fields
+	 */
 	@Test
-	public void testPartitionKeyLessDistribution() throws Exception{
-		/*
-		 * Test the number of partition keys less than the number of distribution fields
-		 */
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+	public void testPartitionKeyLessDistribution() throws Exception {
 		final TestDataDist2 dist = new TestDataDist2();
 
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(dist.getParallelism());
 
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
 		DataSet<Boolean> result = DataSetUtils
 			.partitionByRange(input, dist, 0)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -229,19 +257,17 @@ public class CustomDistributionITCase {
 		env.execute();
 	}
 
+	/*
+	 * Test the number of partition keys larger than the number of distribution fields
+	 */
 	@Test(expected = IllegalArgumentException.class)
-	public void testPartitionMoreThanDistribution() throws Exception{
-		/*
-		 * Test the number of partition keys larger than the number of distribution fields
-		 */
+	public void testPartitionMoreThanDistribution() throws Exception {
+		final TestDataDist2 dist = new TestDataDist2();
 
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		final TestDataDist2 dist = new TestDataDist2();
-
-		DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils
-				.partitionByRange(input, dist, 0, 1, 2);
+		DataSetUtils.partitionByRange(input, dist, 0, 1, 2);
 	}
 	
 	/**
@@ -278,14 +304,10 @@ public class CustomDistributionITCase {
 		}
 
 		@Override
-		public void write(DataOutputView out) throws IOException {
-			
-		}
+		public void write(DataOutputView out) throws IOException {}
 
 		@Override
-		public void read(DataInputView in) throws IOException {
-			
-		}
+		public void read(DataInputView in) throws IOException {}
 	}
 
 	/**
@@ -323,13 +345,9 @@ public class CustomDistributionITCase {
 		}
 
 		@Override
-		public void write(DataOutputView out) throws IOException {
-			
-		}
+		public void write(DataOutputView out) throws IOException {}
 
 		@Override
-		public void read(DataInputView in) throws IOException {
-			
-		}
+		public void read(DataInputView in) throws IOException {}
 	}
 }


[3/6] flink git commit: [FLINK-4417] [checkpoints] Checkpoints are subsumed by CheckpointID not, by timestamp

Posted by se...@apache.org.
[FLINK-4417] [checkpoints] Checkpoints are subsumed by CheckpointID not, by timestamp

This closes #2407


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e9d1775
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e9d1775
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e9d1775

Branch: refs/heads/master
Commit: 4e9d1775b5514c87981c78d55323cc2b17361867
Parents: 4da40bc
Author: Ramkrishna <ra...@intel.com>
Authored: Tue Aug 23 21:53:31 2016 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 19:56:17 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/CheckpointCoordinator.java       | 7 ++++---
 .../org/apache/flink/runtime/jobmanager/JobManager.scala      | 2 +-
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e9d1775/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 2c0e63b..b710324 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -671,7 +671,7 @@ public class CheckpointCoordinator {
 						pendingCheckpoints.remove(checkpointId);
 						rememberRecentCheckpointId(checkpointId);
 
-						dropSubsumedCheckpoints(completed.getTimestamp());
+						dropSubsumedCheckpoints(completed.getCheckpointID());
 
 						triggerQueuedRequests();
 					}
@@ -726,12 +726,13 @@ public class CheckpointCoordinator {
 		recentPendingCheckpoints.addLast(id);
 	}
 
-	private void dropSubsumedCheckpoints(long timestamp) throws Exception {
+	private void dropSubsumedCheckpoints(long checkpointId) throws Exception {
 		Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
 
 		while (entries.hasNext()) {
 			PendingCheckpoint p = entries.next().getValue();
-			if (p.getCheckpointTimestamp() <= timestamp && p.canBeSubsumed()) {
+			// remove all pending checkpoints that are lesser than the current completed checkpoint
+			if (p.getCheckpointId() < checkpointId && p.canBeSubsumed()) {
 				rememberRecentCheckpointId(p.getCheckpointId());
 				p.abortSubsumed();
 				entries.remove();

http://git-wip-us.apache.org/repos/asf/flink/blob/4e9d1775/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d172a2b..34fed3f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1418,7 +1418,7 @@ class JobManager(
             if (checkpointCoordinator != null) {
               future {
                 try {
-                  if (checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
+                  if (!checkpointCoordinator.receiveDeclineMessage(declineMessage)) {
                     log.info("Received message for non-existing checkpoint " +
                       declineMessage.getCheckpointId)
                   }