You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/02 11:56:16 UTC

[1/4] flink git commit: [FLINK-2748][jobmanager] accumulator fetch failure leads to duplicate job result response

Repository: flink
Updated Branches:
  refs/heads/master 9fe285a77 -> df871b3ca


[FLINK-2748][jobmanager] accumulator fetch failure leads to duplicate job result response

This closes #1206.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19fd5bd0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19fd5bd0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19fd5bd0

Branch: refs/heads/master
Commit: 19fd5bd0386e69da14f2ae24fee0aba8e34f3ce0
Parents: 9fe285a
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 16:16:11 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:51:46 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/19fd5bd0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b7f76ce..9417b38 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -382,27 +382,23 @@ class JobManager(
             if (jobInfo.client != ActorRef.noSender) {
               newJobStatus match {
                 case JobStatus.FINISHED =>
-                  val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
-                    executionGraph.getAccumulatorsSerialized()
+                  try {
+                    val accumulatorResults = executionGraph.getAccumulatorsSerialized()
+                    val result = new SerializedJobExecutionResult(
+                      jobID,
+                      jobInfo.duration,
+                      accumulatorResults)
+
+                    jobInfo.client ! decorateMessage(JobResultSuccess(result))
                   } catch {
                     case e: Exception =>
                       log.error(s"Cannot fetch final accumulators for job $jobID", e)
-
                       val exception = new JobExecutionException(jobID,
                         "Failed to retrieve accumulator results.", e)
 
                       jobInfo.client ! decorateMessage(JobResultFailure(
                         new SerializedThrowable(exception)))
-
-                      Collections.emptyMap()
                   }
-
-                  val result = new SerializedJobExecutionResult(
-                    jobID,
-                    jobInfo.duration,
-                    accumulatorResults)
-                  jobInfo.client ! decorateMessage(JobResultSuccess(result))
-
                 case JobStatus.CANCELED =>
                   // the error may be packed as a serialized throwable
                   val unpackedError = SerializedThrowable.get(


[3/4] flink git commit: [FLINK-1599][docs] TypeComperator with no keys and comparators matches some elements

Posted by mx...@apache.org.
[FLINK-1599][docs] TypeComperator with no keys and comparators matches some elements

- update JavaDoc to clarify the usage of the extractKey(..) and
  getFlatComparators() method

This closes #1207.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df871b3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df871b3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df871b3c

Branch: refs/heads/master
Commit: df871b3ca1effdb2f4d5cbdb9d35c62a60a1564f
Parents: 142a844
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 17:42:42 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200

----------------------------------------------------------------------
 .../flink/api/common/typeutils/TypeComparator.java      | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df871b3c/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
index f98a05e..d017694 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
@@ -288,14 +288,20 @@ public abstract class TypeComparator<T> implements Serializable {
 
 	/**
 	 * Extracts the key fields from a record. This is for use by the PairComparator to provide
-	 * interoperability between different record types.
+	 * interoperability between different record types. Note, that at least one key should be extracted.
+	 * @param record The record that contains the key(s)
+	 * @param target The array to write the key(s) into.
+	 * @param index The offset of the target array to start writing into.
 	 * @return the number of keys added to target.
 	 */
 	public abstract int extractKeys(Object record, Object[] target, int index);
 
 	/**
-	 * Get the field comparators. This is used together with {@link #extractKeys(Object, Object[], int)} to provide
-	 * interoperability between different record types.
+	 * Get the field comparators. This is used together with {@link #extractKeys(Object, Object[], int)}
+	 * to provide interoperability between different record types. Note, that this should return at
+	 * least one Comparator and that the number of Comparators must match the number of extracted
+	 * keys.
+	 * @return An Array of Comparators for the extracted keys.
 	 */
 	@SuppressWarnings("rawtypes")
 	public abstract TypeComparator[] getFlatComparators();


[2/4] flink git commit: [FLINK-2776][cli] print job id when submitting a job

Posted by mx...@apache.org.
[FLINK-2776][cli] print job id when submitting a job


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc7369e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc7369e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc7369e1

Branch: refs/heads/master
Commit: fc7369e1e1862d2019a737c1ae811ccf6eca3c3e
Parents: 19fd5bd
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Sep 28 21:12:08 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 30 +++++++-------------
 .../flink/yarn/YARNSessionFIFOITCase.java       |  9 +++++-
 2 files changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc7369e1/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index f0e6c4f..d071cdb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -647,20 +647,13 @@ public class CliFrontend {
 			program.deleteExtractedLibraries();
 		}
 
-		if (result != null) {
-			// if the job has been submitted to a detached YARN cluster, there won't be any
-			// exec results, but the object will be set (for the job id)
-			if (yarnCluster != null && yarnCluster.isDetached()) {
+		if (yarnCluster != null && yarnCluster.isDetached()) {
+			yarnCluster.stopAfterJob(result.getJobID());
+			yarnCluster.disconnect();
+		}
 
-				yarnCluster.stopAfterJob(result.getJobID());
-				yarnCluster.disconnect();
-				if (!webFrontend) {
-					System.out.println("The Job has been submitted with JobID " + result.getJobID());
-				}
-				return 0;
-			} else {
-				throw new RuntimeException("Error while starting job. No Job ID set.");
-			}
+		if (!webFrontend) {
+			System.out.println("Job has been submitted with JobID " + result.getJobID());
 		}
 
 		return 0;
@@ -683,17 +676,14 @@ public class CliFrontend {
 
 		LOG.info("Program execution finished");
 
-		if (result != null) {
-			if (!webFrontend) {
-				System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
-			}
+		if (!webFrontend) {
+			System.out.println("Job with JobID " + result.getJobID() + " has finished.");
+			System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
 			Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
-			if (accumulatorsResult.size() > 0 && !webFrontend) {
+			if (accumulatorsResult.size() > 0) {
 				System.out.println("Accumulator Results: ");
 				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
 			}
-		} else {
-			LOG.info("The Job did not return an execution result");
 		}
 
 		return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc7369e1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index cd2bdc6..9d72c90 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -487,11 +487,18 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-ytm", "1024",
 						"-ys", "2", // test requesting slots from YARN.
 						"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
-				"The Job has been submitted with JobID",
+				"Job has been submitted with JobID",
 				RunTypes.CLI_FRONTEND);
 
 		// it should usually be 2, but on slow machines, the number varies
 		Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
+		// give the runner some time to detach
+		for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
+			try {
+				Thread.sleep(500);
+			} catch (InterruptedException e) {
+			}
+		}
 		Assert.assertFalse("The runner should detach.", runner.isAlive());
 		LOG.info("CLI Frontend has returned, so the job is running");
 


[4/4] flink git commit: [FLINK-2762][cli] print execution result only if available

Posted by mx...@apache.org.
[FLINK-2762][cli] print execution result only if available


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/142a8440
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/142a8440
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/142a8440

Branch: refs/heads/master
Commit: 142a8440f3e31be3c5c83f87a36cf468c103a9e2
Parents: fc7369e
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 15:58:53 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 21 +++++++++++---------
 .../org/apache/flink/client/program/Client.java |  4 ++--
 2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index d071cdb..034227e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -39,12 +39,12 @@ import java.util.Properties;
 import akka.actor.ActorSystem;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.InfoOptions;
@@ -638,6 +638,8 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+		LOG.info("Starting execution of program");
+
 		JobSubmissionResult result;
 		try {
 			result = client.runDetached(program, parallelism);
@@ -662,7 +664,7 @@ public class CliFrontend {
 	protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
 		LOG.info("Starting execution of program");
 
-		JobExecutionResult result;
+		JobSubmissionResult result;
 		try {
 			client.setPrintStatusDuringExecution(true);
 			result = client.runBlocking(program, parallelism);
@@ -676,13 +678,14 @@ public class CliFrontend {
 
 		LOG.info("Program execution finished");
 
-		if (!webFrontend) {
-			System.out.println("Job with JobID " + result.getJobID() + " has finished.");
-			System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
-			Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+		if (result instanceof JobExecutionResult && !webFrontend) {
+			JobExecutionResult execResult = (JobExecutionResult) result;
+			System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
+			System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
+			Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
 			if (accumulatorsResult.size() > 0) {
-				System.out.println("Accumulator Results: ");
-				System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+					System.out.println("Accumulator Results: ");
+					System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index c72681d..91ed665 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -275,7 +275,7 @@ public class Client {
 	//  Program submission / execution
 	// ------------------------------------------------------------------------
 
-	public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+	public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
 			return runBlocking(prog.getPlanWithJars(), parallelism);
@@ -292,7 +292,7 @@ public class Client {
 				ContextEnvironment.unsetContext();
 			}
 
-			return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+			return new JobSubmissionResult(lastJobID);
 		}
 		else {
 			throw new RuntimeException();