You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/02 11:56:16 UTC
[1/4] flink git commit: [FLINK-2748][jobmanager] accumulator fetch
failure leads to duplicate job result response
Repository: flink
Updated Branches:
refs/heads/master 9fe285a77 -> df871b3ca
[FLINK-2748][jobmanager] accumulator fetch failure leads to duplicate job result response
This closes #1206.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/19fd5bd0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/19fd5bd0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/19fd5bd0
Branch: refs/heads/master
Commit: 19fd5bd0386e69da14f2ae24fee0aba8e34f3ce0
Parents: 9fe285a
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 16:16:11 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:51:46 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmanager/JobManager.scala | 20 ++++++++------------
1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/19fd5bd0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b7f76ce..9417b38 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -382,27 +382,23 @@ class JobManager(
if (jobInfo.client != ActorRef.noSender) {
newJobStatus match {
case JobStatus.FINISHED =>
- val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
- executionGraph.getAccumulatorsSerialized()
+ try {
+ val accumulatorResults = executionGraph.getAccumulatorsSerialized()
+ val result = new SerializedJobExecutionResult(
+ jobID,
+ jobInfo.duration,
+ accumulatorResults)
+
+ jobInfo.client ! decorateMessage(JobResultSuccess(result))
} catch {
case e: Exception =>
log.error(s"Cannot fetch final accumulators for job $jobID", e)
-
val exception = new JobExecutionException(jobID,
"Failed to retrieve accumulator results.", e)
jobInfo.client ! decorateMessage(JobResultFailure(
new SerializedThrowable(exception)))
-
- Collections.emptyMap()
}
-
- val result = new SerializedJobExecutionResult(
- jobID,
- jobInfo.duration,
- accumulatorResults)
- jobInfo.client ! decorateMessage(JobResultSuccess(result))
-
case JobStatus.CANCELED =>
// the error may be packed as a serialized throwable
val unpackedError = SerializedThrowable.get(
[3/4] flink git commit: [FLINK-1599][docs] TypeComperator with no
keys and comparators matches some elements
Posted by mx...@apache.org.
[FLINK-1599][docs] TypeComperator with no keys and comparators matches some elements
- update JavaDoc to clarify the usage of the extractKey(..) and
getFlatComparators() method
This closes #1207.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df871b3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df871b3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df871b3c
Branch: refs/heads/master
Commit: df871b3ca1effdb2f4d5cbdb9d35c62a60a1564f
Parents: 142a844
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 17:42:42 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200
----------------------------------------------------------------------
.../flink/api/common/typeutils/TypeComparator.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/df871b3c/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
index f98a05e..d017694 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
@@ -288,14 +288,20 @@ public abstract class TypeComparator<T> implements Serializable {
/**
* Extracts the key fields from a record. This is for use by the PairComparator to provide
- * interoperability between different record types.
+ * interoperability between different record types. Note, that at least one key should be extracted.
+ * @param record The record that contains the key(s)
+ * @param target The array to write the key(s) into.
+ * @param index The offset of the target array to start writing into.
* @return the number of keys added to target.
*/
public abstract int extractKeys(Object record, Object[] target, int index);
/**
- * Get the field comparators. This is used together with {@link #extractKeys(Object, Object[], int)} to provide
- * interoperability between different record types.
+ * Get the field comparators. This is used together with {@link #extractKeys(Object, Object[], int)}
+ * to provide interoperability between different record types. Note, that this should return at
+ * least one Comparator and that the number of Comparators must match the number of extracted
+ * keys.
+ * @return An Array of Comparators for the extracted keys.
*/
@SuppressWarnings("rawtypes")
public abstract TypeComparator[] getFlatComparators();
[2/4] flink git commit: [FLINK-2776][cli] print job id when
submitting a job
Posted by mx...@apache.org.
[FLINK-2776][cli] print job id when submitting a job
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc7369e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc7369e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc7369e1
Branch: refs/heads/master
Commit: fc7369e1e1862d2019a737c1ae811ccf6eca3c3e
Parents: 19fd5bd
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Sep 28 21:12:08 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 30 +++++++-------------
.../flink/yarn/YARNSessionFIFOITCase.java | 9 +++++-
2 files changed, 18 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc7369e1/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index f0e6c4f..d071cdb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -647,20 +647,13 @@ public class CliFrontend {
program.deleteExtractedLibraries();
}
- if (result != null) {
- // if the job has been submitted to a detached YARN cluster, there won't be any
- // exec results, but the object will be set (for the job id)
- if (yarnCluster != null && yarnCluster.isDetached()) {
+ if (yarnCluster != null && yarnCluster.isDetached()) {
+ yarnCluster.stopAfterJob(result.getJobID());
+ yarnCluster.disconnect();
+ }
- yarnCluster.stopAfterJob(result.getJobID());
- yarnCluster.disconnect();
- if (!webFrontend) {
- System.out.println("The Job has been submitted with JobID " + result.getJobID());
- }
- return 0;
- } else {
- throw new RuntimeException("Error while starting job. No Job ID set.");
- }
+ if (!webFrontend) {
+ System.out.println("Job has been submitted with JobID " + result.getJobID());
}
return 0;
@@ -683,17 +676,14 @@ public class CliFrontend {
LOG.info("Program execution finished");
- if (result != null) {
- if (!webFrontend) {
- System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
- }
+ if (!webFrontend) {
+ System.out.println("Job with JobID " + result.getJobID() + " has finished.");
+ System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
- if (accumulatorsResult.size() > 0 && !webFrontend) {
+ if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
- } else {
- LOG.info("The Job did not return an execution result");
}
return 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/fc7369e1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index cd2bdc6..9d72c90 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -487,11 +487,18 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-ytm", "1024",
"-ys", "2", // test requesting slots from YARN.
"--yarndetached", job, tmpInFile.getAbsoluteFile().toString() , tmpOutFolder.getAbsoluteFile().toString()},
- "The Job has been submitted with JobID",
+ "Job has been submitted with JobID",
RunTypes.CLI_FRONTEND);
// it should usually be 2, but on slow machines, the number varies
Assert.assertTrue("There should be at most 2 containers running", getRunningContainers() <= 2);
+ // give the runner some time to detach
+ for (int attempt = 0; runner.isAlive() && attempt < 5; attempt++) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
Assert.assertFalse("The runner should detach.", runner.isAlive());
LOG.info("CLI Frontend has returned, so the job is running");
[4/4] flink git commit: [FLINK-2762][cli] print execution result only
if available
Posted by mx...@apache.org.
[FLINK-2762][cli] print execution result only if available
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/142a8440
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/142a8440
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/142a8440
Branch: refs/heads/master
Commit: 142a8440f3e31be3c5c83f87a36cf468c103a9e2
Parents: fc7369e
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Oct 1 15:58:53 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Oct 2 11:52:02 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 21 +++++++++++---------
.../org/apache/flink/client/program/Client.java | 4 ++--
2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index d071cdb..034227e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -39,12 +39,12 @@ import java.util.Properties;
import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.InfoOptions;
@@ -638,6 +638,8 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
+ LOG.info("Starting execution of program");
+
JobSubmissionResult result;
try {
result = client.runDetached(program, parallelism);
@@ -662,7 +664,7 @@ public class CliFrontend {
protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
LOG.info("Starting execution of program");
- JobExecutionResult result;
+ JobSubmissionResult result;
try {
client.setPrintStatusDuringExecution(true);
result = client.runBlocking(program, parallelism);
@@ -676,13 +678,14 @@ public class CliFrontend {
LOG.info("Program execution finished");
- if (!webFrontend) {
- System.out.println("Job with JobID " + result.getJobID() + " has finished.");
- System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
- Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+ if (result instanceof JobExecutionResult && !webFrontend) {
+ JobExecutionResult execResult = (JobExecutionResult) result;
+ System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
+ System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
+ Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
- System.out.println("Accumulator Results: ");
- System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+ System.out.println("Accumulator Results: ");
+ System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/142a8440/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index c72681d..91ed665 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -275,7 +275,7 @@ public class Client {
// Program submission / execution
// ------------------------------------------------------------------------
- public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
+ public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
return runBlocking(prog.getPlanWithJars(), parallelism);
@@ -292,7 +292,7 @@ public class Client {
ContextEnvironment.unsetContext();
}
- return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID));
+ return new JobSubmissionResult(lastJobID);
}
else {
throw new RuntimeException();