You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/31 10:49:26 UTC
[4/4] flink git commit: [FLINK-1771] Add support for submitting
single jobs to a detached YARN session
[FLINK-1771] Add support for submitting single jobs to a detached YARN session
With this change, users can submit a Flink job to a YARN cluster without having a local client monitoring the Application Master or job. You can basically fire and forget a Flink job to YARN.
For supporting this, the ApplicationMaster can now monitor the status of a job and shutdown itself once it is in a terminal state.
The change also verifies that various ways of setting the parallelism on YARN are passed through the system correctly (per job, session).
There was a bug in YARN container creation which made the configuration values for the heap offset useless. This change fixes this error.
All mentioned features and bugs are covered by the flink-yarn-tests.
This closes #542
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b0d4076
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b0d4076
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b0d4076
Branch: refs/heads/master
Commit: 6b0d40764da9dce2e2d21882e9a03a21c6783ff0
Parents: 121a5a0
Author: Robert Metzger <rm...@apache.org>
Authored: Fri Mar 13 15:53:51 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Mar 31 08:56:39 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 116 ++++++---
.../flink/client/FlinkYarnSessionCli.java | 12 +-
.../org/apache/flink/client/LocalExecutor.java | 19 +-
.../org/apache/flink/client/RemoteExecutor.java | 27 ++-
.../org/apache/flink/client/program/Client.java | 85 ++++---
.../client/program/ContextEnvironment.java | 39 ++-
.../flink/client/CliFrontendInfoTest.java | 4 +-
.../flink/client/CliFrontendListCancelTest.java | 2 +-
.../apache/flink/client/CliFrontendRunTest.java | 2 +-
.../client/program/ClientConnectionTest.java | 2 +-
.../apache/flink/client/program/ClientTest.java | 9 +-
.../ExecutionPlanAfterExecutionTest.java | 3 +-
.../program/ExecutionPlanCreationTest.java | 2 +-
.../flink/api/common/JobExecutionResult.java | 7 +-
.../java/org/apache/flink/api/common/JobID.java | 60 +++++
.../flink/api/common/JobSubmissionResult.java | 39 +++
.../common/operators/CollectionExecutor.java | 2 +-
.../main/flink-bin/conf/log4j-cli.properties | 1 +
.../examples/java/wordcount/WordCount.java | 1 -
flink-optimizer/pom.xml | 19 +-
.../org/apache/flink/optimizer/Optimizer.java | 20 +-
.../optimizer/AdditionalOperatorsTest.java | 1 +
.../optimizer/BranchingPlansCompilerTest.java | 1 +
.../BroadcastVariablePipelinebreakerTest.java | 1 +
.../CachedMatchStrategyCompilerTest.java | 1 +
.../optimizer/CoGroupSolutionSetFirstTest.java | 1 +
.../flink/optimizer/CompilerTestBase.java | 229 ------------------
.../flink/optimizer/DisjointDataFlowsTest.java | 1 +
.../optimizer/DistinctCompilationTest.java | 1 +
.../apache/flink/optimizer/GroupOrderTest.java | 1 +
.../optimizer/HardPlansCompilationTest.java | 1 +
.../flink/optimizer/IterationsCompilerTest.java | 1 +
.../flink/optimizer/NestedIterationsTest.java | 1 +
.../flink/optimizer/ParallelismChangeTest.java | 1 +
.../flink/optimizer/PartitionPushdownTest.java | 1 +
.../optimizer/PartitioningReusageTest.java | 1 +
.../flink/optimizer/PipelineBreakerTest.java | 1 +
.../flink/optimizer/PropertyDataSourceTest.java | 1 +
.../apache/flink/optimizer/ReduceAllTest.java | 1 +
.../optimizer/ReplicatingDataSourceTest.java | 1 +
.../SemanticPropertiesAPIToPlanTest.java | 1 +
.../flink/optimizer/SortPartialReuseTest.java | 1 +
.../UnionBetweenDynamicAndStaticPathTest.java | 1 +
.../optimizer/UnionPropertyPropagationTest.java | 1 +
.../flink/optimizer/UnionReplacementTest.java | 1 +
.../WorksetIterationCornerCasesTest.java | 1 +
.../WorksetIterationsRecordApiCompilerTest.java | 1 +
...naryCustomPartitioningCompatibilityTest.java | 2 +-
.../CoGroupCustomPartitioningTest.java | 2 +-
...ustomPartitioningGlobalOptimizationTest.java | 2 +-
.../custompartition/CustomPartitioningTest.java | 2 +-
.../GroupingKeySelectorTranslationTest.java | 2 +-
.../GroupingPojoTranslationTest.java | 2 +-
.../GroupingTupleTranslationTest.java | 2 +-
.../JoinCustomPartitioningTest.java | 2 +-
.../DataExchangeModeClosedBranchingTest.java | 2 +-
.../DataExchangeModeForwardTest.java | 2 +-
.../DataExchangeModeOpenBranchingTest.java | 2 +-
.../java/DeltaIterationDependenciesTest.java | 2 +-
.../java/DistinctAndGroupingOptimizerTest.java | 2 +-
.../java/GroupReduceCompilationTest.java | 2 +-
.../optimizer/java/IterationCompilerTest.java | 2 +-
.../optimizer/java/JoinTranslationTest.java | 2 +-
.../flink/optimizer/java/OpenIterationTest.java | 2 +-
.../optimizer/java/PartitionOperatorTest.java | 2 +-
.../optimizer/java/ReduceCompilationTest.java | 2 +-
.../WorksetIterationsJavaApiCompilerTest.java | 2 +-
.../CoGroupOnConflictingPartitioningsTest.java | 2 +-
.../JoinOnConflictingPartitioningsTest.java | 2 +-
.../flink/optimizer/util/CompilerTestBase.java | 240 +++++++++++++++++++
.../flink/optimizer/util/OperatorResolver.java | 127 ++++++++++
flink-quickstart/pom.xml | 2 +-
.../runtime/accumulators/AccumulatorEvent.java | 2 +-
.../apache/flink/runtime/blob/BlobClient.java | 2 +-
.../apache/flink/runtime/blob/BlobServer.java | 2 +-
.../runtime/blob/BlobServerConnection.java | 2 +-
.../apache/flink/runtime/blob/BlobUtils.java | 2 +-
.../client/JobCancellationException.java | 2 +-
.../runtime/client/JobExecutionException.java | 2 +-
.../flink/runtime/client/JobStatusMessage.java | 2 +-
.../runtime/client/JobSubmissionException.java | 2 +-
.../runtime/client/JobTimeoutException.java | 2 +-
.../deployment/TaskDeploymentDescriptor.java | 2 +-
.../flink/runtime/execution/Environment.java | 2 +-
.../runtime/execution/RuntimeEnvironment.java | 2 +-
.../librarycache/BlobLibraryCacheManager.java | 2 +-
.../FallbackLibraryCacheManager.java | 2 +-
.../librarycache/LibraryCacheManager.java | 2 +-
.../runtime/executiongraph/ExecutionGraph.java | 2 +-
.../executiongraph/ExecutionJobVertex.java | 2 +-
.../runtime/executiongraph/ExecutionVertex.java | 2 +-
.../flink/runtime/filecache/FileCache.java | 2 +-
.../apache/flink/runtime/instance/Instance.java | 2 +-
.../flink/runtime/instance/SharedSlot.java | 2 +-
.../flink/runtime/instance/SimpleSlot.java | 2 +-
.../org/apache/flink/runtime/instance/Slot.java | 2 +-
.../runtime/io/network/NetworkEnvironment.java | 2 +-
.../io/network/partition/ResultPartition.java | 2 +-
.../ResultPartitionConsumableNotifier.java | 3 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 1 +
.../apache/flink/runtime/jobgraph/JobID.java | 61 -----
.../accumulators/AccumulatorManager.java | 2 +-
.../jobmanager/web/JobManagerInfoServlet.java | 2 +-
.../runtime/profiling/JobManagerProfiler.java | 2 +-
.../profiling/impl/EnvironmentThreadSet.java | 2 +-
.../InternalExecutionVertexProfilingData.java | 2 +-
...ernalExecutionVertexThreadProfilingData.java | 2 +-
.../profiling/types/InstanceProfilingEvent.java | 2 +-
.../types/InstanceSummaryProfilingEvent.java | 2 +-
.../runtime/profiling/types/ProfilingEvent.java | 2 +-
.../types/SingleInstanceProfilingEvent.java | 2 +-
.../profiling/types/ThreadProfilingEvent.java | 2 +-
.../profiling/types/VertexProfilingEvent.java | 2 +-
.../apache/flink/runtime/taskmanager/Task.java | 2 +-
.../runtime/taskmanager/TaskExecutionState.java | 2 +-
.../taskmanager/TaskInputSplitProvider.java | 2 +-
.../runtime/yarn/AbstractFlinkYarnClient.java | 84 ++++++-
.../runtime/yarn/AbstractFlinkYarnCluster.java | 29 +++
.../apache/flink/runtime/client/JobClient.scala | 4 +-
.../flink/runtime/jobmanager/JobManager.scala | 6 +-
.../runtime/jobmanager/MemoryArchivist.scala | 2 +-
.../StreamCheckpointCoordinator.scala | 3 +-
.../runtime/messages/ArchiveMessages.scala | 2 +-
.../messages/ExecutionGraphMessages.scala | 3 +-
.../runtime/messages/JobManagerMessages.scala | 3 +-
.../minicluster/LocalFlinkMiniCluster.scala | 4 +
.../flink/runtime/blob/BlobClientTest.java | 2 +-
.../runtime/blob/BlobServerDeleteTest.java | 2 +-
.../flink/runtime/blob/BlobServerPutTest.java | 2 +-
.../TaskDeploymentDescriptorTest.java | 2 +-
.../BlobLibraryCacheManagerTest.java | 2 +-
.../ExecutionGraphConstructionTest.java | 2 +-
.../ExecutionGraphDeploymentTest.java | 2 +-
.../executiongraph/ExecutionGraphTestUtils.java | 2 +-
.../ExecutionStateProgressTest.java | 2 +-
.../ExecutionVertexCancelTest.java | 2 +-
.../ExecutionVertexDeploymentTest.java | 2 +-
.../ExecutionVertexSchedulingTest.java | 2 +-
.../executiongraph/PointwisePatternTest.java | 2 +-
.../executiongraph/VertexSlotSharingTest.java | 2 +-
.../FileCacheDeleteValidationTest.java | 2 +-
.../flink/runtime/instance/InstanceTest.java | 2 +-
.../flink/runtime/instance/SimpleSlotTest.java | 2 +-
.../consumer/LocalInputChannelTest.java | 2 +-
.../flink/runtime/jobgraph/JobIdTest.java | 1 +
.../scheduler/SchedulerTestUtils.java | 2 +-
.../jobmanager/scheduler/SharedSlotsTest.java | 2 +-
.../scheduler/SlotAllocationFutureTest.java | 2 +-
.../operators/testutils/MockEnvironment.java | 2 +-
.../profiling/types/ProfilingTypesTest.java | 5 +-
.../taskmanager/TaskExecutionStateTest.java | 2 +-
.../runtime/taskmanager/TaskManagerTest.java | 2 +-
.../flink/runtime/taskmanager/TaskTest.java | 2 +-
.../ExecutionGraphRestartTest.scala | 3 +-
.../TaskManagerLossFailsTasksTest.scala | 3 +-
.../testingUtils/TestingJobManager.scala | 3 +-
.../TestingJobManagerMessages.scala | 3 +-
.../testingUtils/TestingTaskManager.scala | 2 +-
.../TestingTaskManagerMessages.scala | 2 +-
.../api/avro/AvroExternalJarProgramITCase.java | 2 +-
flink-staging/flink-spargel/pom.xml | 7 +
.../flink/spargel/java/SpargelCompilerTest.java | 2 +-
.../environment/RemoteStreamEnvironment.java | 11 +-
.../environment/StreamContextEnvironment.java | 18 +-
.../test/compiler/util/CompilerTestBase.java | 201 ----------------
.../test/compiler/util/OperatorResolver.java | 127 ----------
.../flink/test/util/RecordAPITestBase.java | 2 +-
.../apache/flink/test/util/TestEnvironment.java | 2 +-
flink-tests/pom.xml | 8 +
.../test/cancelling/CancellingTestBase.java | 2 +-
.../compiler/examples/KMeansSingleStepTest.java | 4 +-
.../examples/RelationalQueryCompilerTest.java | 4 +-
.../examples/WordCountCompilerTest.java | 2 +-
.../ConnectedComponentsCoGroupTest.java | 2 +-
.../iterations/ConnectedComponentsTest.java | 2 +-
.../iterations/IterativeKMeansTest.java | 4 +-
...ultipleJoinsWithSolutionSetCompilerTest.java | 2 +-
.../iterations/PageRankCompilerTest.java | 2 +-
.../compiler/plandump/DumpCompiledPlanTest.java | 2 +-
.../test/failingPrograms/TaskFailureITCase.java | 4 +-
.../flink/test/operators/ReduceITCase.java | 2 +-
.../flink/test/operators/UnionSinkITCase.java | 2 +-
.../PartitionOperatorTranslationTest.scala | 2 +-
.../CoGroupCustomPartitioningTest.scala | 2 +-
.../CoGroupGroupSortTranslationTest.scala | 2 +-
...tomPartitioningGroupingKeySelectorTest.scala | 2 +-
.../CustomPartitioningGroupingPojoTest.scala | 2 +-
.../CustomPartitioningGroupingTupleTest.scala | 2 +-
.../translation/CustomPartitioningTest.scala | 2 +-
.../JoinCustomPartitioningTest.scala | 2 +-
.../YARNSessionCapacitySchedulerITCase.java | 6 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 183 +++++++++++---
.../org/apache/flink/yarn/YarnTestBase.java | 78 +++++-
.../src/main/resources/log4j-test.properties | 2 +-
.../org/apache/flink/yarn/FlinkYarnClient.java | 29 ++-
.../org/apache/flink/yarn/FlinkYarnCluster.java | 144 ++++++++---
.../main/java/org/apache/flink/yarn/Utils.java | 7 +-
.../apache/flink/yarn/ApplicationClient.scala | 20 +-
.../flink/yarn/ApplicationMasterActor.scala | 70 +++++-
.../scala/org/apache/flink/yarn/Messages.scala | 10 +
.../java/org/apache/flink/yarn/UtilsTests.java | 18 +-
pom.xml | 2 +-
202 files changed, 1493 insertions(+), 1003 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index dd2a0ba..a13b322 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -42,6 +42,7 @@ import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.commons.cli.CommandLine;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
@@ -66,7 +67,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
@@ -265,12 +266,32 @@ public class CliFrontend {
}
try {
- Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
-
- int parallelism = options.getParallelism();
- int exitCode = executeProgram(program, client, parallelism);
-
- if (yarnCluster != null) {
+ int userParallelism = options.getParallelism();
+ LOG.debug("User parallelism is set to {}", userParallelism);
+
+ Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+ LOG.debug("Client slots is set to {}", client.getMaxSlots());
+ if(client.getMaxSlots() != -1 && userParallelism == -1) {
+ logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
+ "To use another parallelism, set it at the ./bin/flink client.");
+ userParallelism = client.getMaxSlots();
+ }
+ int exitCode = 0;
+
+ // check if detached per job yarn cluster is used to start flink
+ if(yarnCluster != null && yarnCluster.isDetached()) {
+ logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
+ "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
+ "yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
+ "Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
+ executeProgram(program, client, userParallelism, false);
+ } else {
+ // regular (blocking) execution.
+ exitCode = executeProgram(program, client, userParallelism, true);
+ }
+
+ // show YARN cluster status if its not a detached YARN cluster.
+ if (yarnCluster != null && !yarnCluster.isDetached()) {
List<String> msgs = yarnCluster.getNewMessages();
if (msgs != null && msgs.size() > 1) {
@@ -291,7 +312,7 @@ public class CliFrontend {
return handleError(t);
}
finally {
- if (yarnCluster != null) {
+ if (yarnCluster != null && !yarnCluster.isDetached()) {
logAndSysout("Shutting down YARN cluster");
yarnCluster.shutdown();
}
@@ -346,7 +367,7 @@ public class CliFrontend {
int parallelism = options.getParallelism();
LOG.info("Creating program plan dump");
- Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
+ Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism);
if (jsonPlan != null) {
@@ -555,12 +576,12 @@ public class CliFrontend {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------
- protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
- LOG.info("Starting execution or program");
- JobExecutionResult execResult;
+ protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
+ LOG.info("Starting execution of program");
+ JobSubmissionResult execResult;
try {
client.setPrintStatusDuringExecution(true);
- execResult = client.run(program, parallelism, true);
+ execResult = client.run(program, parallelism, wait);
}
catch (ProgramInvocationException e) {
return handleError(e);
@@ -569,15 +590,33 @@ public class CliFrontend {
program.deleteExtractedLibraries();
}
- LOG.info("Program execution finished");
+ if(wait) {
+ LOG.info("Program execution finished");
+ }
- // we come here after the job has finished
+ // we come here after the job has finished (or the job has been submitted)
if (execResult != null) {
- System.out.println("Job Runtime: " + execResult.getNetRuntime());
- Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
- if (accumulatorsResult.size() > 0) {
- System.out.println("Accumulator Results: ");
- System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+ // if the job has been submitted to a detached YARN cluster, there won't be any
+ // exec results, but the object will be set (for the job id)
+ if (yarnCluster != null && yarnCluster.isDetached()) {
+ if(execResult.getJobID() == null) {
+ throw new RuntimeException("Error while starting job. No Job ID set.");
+ }
+ yarnCluster.stopAfterJob(execResult.getJobID());
+ yarnCluster.disconnect();
+ System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
+ return 0;
+ }
+ if (execResult instanceof JobExecutionResult) {
+ JobExecutionResult result = (JobExecutionResult) execResult;
+ System.out.println("Job Runtime: " + result.getNetRuntime());
+ Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
+ if (accumulatorsResult.size() > 0) {
+ System.out.println("Accumulator Results: ");
+ System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+ }
+ } else {
+ LOG.info("The Job did not return an execution result");
}
}
return 0;
@@ -681,26 +720,47 @@ public class CliFrontend {
LOG.info("JobManager is at " + jmActor.path());
return jmActor;
}
-
-
- protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName) throws Exception {
+ /**
+ *
+ * @param options
+ * @param classLoader
+ * @param programName
+ * @param userParallelism The parallelism requested by the user in the CLI frontend.
+ * @return
+ * @throws Exception
+ */
+ protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName, int userParallelism) throws Exception {
InetSocketAddress jobManagerAddress;
-
+ int maxSlots = -1;
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
// user wants to run Flink in YARN cluster.
CommandLine commandLine = options.getCommandLine();
- AbstractFlinkYarnClient flinkYarnClient =
- CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
+ AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
if (flinkYarnClient == null) {
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
}
+
+ // the number of slots available from YARN:
+ int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
+ if(yarnTmSlots == -1) {
+ yarnTmSlots = 1;
+ }
+ maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
+ if(userParallelism != -1) {
+ int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
+ logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
+ "Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
+ flinkYarnClient.setTaskManagerSlots(slotsPerTM);
+ }
+
try {
yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName);
+ yarnCluster.connectToCluster();
}
catch(Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
@@ -722,7 +782,7 @@ public class CliFrontend {
break;
}
} else {
- logAndSysout("No status updates from YARN cluster received so far. Waiting ...");
+ logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}
try {
@@ -738,7 +798,7 @@ public class CliFrontend {
else {
jobManagerAddress = getJobManagerAddress(options);
}
- return new Client(jobManagerAddress, config, classLoader);
+ return new Client(jobManagerAddress, config, classLoader, maxSlots);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 25f31e3..7352457 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -122,12 +122,6 @@ public class FlinkYarnSessionCli {
} else {
LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar");
localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
- if(!localJarPath.toString().contains("uberjar")) {
- // we need to have a proper uberjar because otherwise we don't have the required classes available on the cluster.
- // most likely the user did try to start yarn in a regular hadoop2 flink build (not a yarn package) (using ./bin/flink -m yarn-cluster)
- LOG.error("The detected jar file '"+localJarPath+"' is not a uberjar.");
- return null;
- }
}
flinkYarnClient.setLocalJarPath(localJarPath);
@@ -392,6 +386,10 @@ public class FlinkYarnSessionCli {
try {
yarnCluster = flinkYarnClient.deploy(null);
+ // only connect to cluster if its not a detached session.
+ if(!flinkYarnClient.isDetached()) {
+ yarnCluster.connectToCluster();
+ }
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage());
e.printStackTrace(System.err);
@@ -423,7 +421,7 @@ public class FlinkYarnSessionCli {
if (detachedMode) {
// print info and quit:
- LOG.info("The Flink YARN client has been started in detached mode. In order to stop" +
+ LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
"Please also note that the temporary files of the YARN session in {} will not be removed.",
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 6df0f79..5ee4e5d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -89,15 +89,20 @@ public class LocalExecutor extends PlanExecutor {
}
// --------------------------------------------------------------------------------------------
-
+
+ public static Configuration getConfiguration(LocalExecutor le) {
+ Configuration configuration = new Configuration();
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
+ configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
+ return configuration;
+ }
+
public void start() throws Exception {
synchronized (this.lock) {
if (this.flink == null) {
// create the embedded runtime
- Configuration configuration = new Configuration();
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
- configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
+ Configuration configuration = getConfiguration(this);
// start it up
this.flink = new LocalFlinkMiniCluster(configuration, true);
} else {
@@ -158,7 +163,7 @@ public class LocalExecutor extends PlanExecutor {
}
try {
- Optimizer pc = new Optimizer(new DataStatistics());
+ Optimizer pc = new Optimizer(new DataStatistics(), this.flink.getConfiguration());
OptimizedPlan op = pc.compile(plan);
JobGraphGenerator jgg = new JobGraphGenerator();
@@ -186,7 +191,7 @@ public class LocalExecutor extends PlanExecutor {
* @throws Exception
*/
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
- Optimizer pc = new Optimizer(new DataStatistics());
+ Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration(this));
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
@@ -242,7 +247,7 @@ public class LocalExecutor extends PlanExecutor {
LocalExecutor exec = new LocalExecutor();
try {
exec.start();
- Optimizer pc = new Optimizer(new DataStatistics());
+ Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.getConfiguration());
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 74a7ddb..1759b65 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.Client;
@@ -35,11 +36,13 @@ import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemoteExecutor extends PlanExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);
private final List<String> jarFiles;
-
private final InetSocketAddress address;
public RemoteExecutor(String hostname, int port) {
@@ -86,22 +89,34 @@ public class RemoteExecutor extends PlanExecutor {
}
public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
- Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
- return c.run(p, -1, true);
+ Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
+ JobSubmissionResult result = c.run(p, -1, true);
+ if(result instanceof JobExecutionResult) {
+ return (JobExecutionResult) result;
+ } else {
+ LOG.warn("The Client didn't return a JobExecutionResult");
+ return new JobExecutionResult(result.getJobID(), -1, null);
+ }
}
public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception {
File jarFile = new File(jarPath);
PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
- Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader());
- return c.run(program.getPlanWithJars(), -1, true);
+ Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
+ JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
+ if(result instanceof JobExecutionResult) {
+ return (JobExecutionResult) result;
+ } else {
+ LOG.warn("The Client didn't return a JobExecutionResult");
+ return new JobExecutionResult(result.getJobID(), -1, null);
+ }
}
@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
JobWithJars p = new JobWithJars(plan, this.jarFiles);
- Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
+ Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 9a578bc..6dff9e2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -25,6 +25,8 @@ import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.List;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
@@ -69,6 +71,19 @@ public class Client {
private final Optimizer compiler; // the compiler to compile the jobs
private boolean printStatusDuringExecution = false;
+
+ /**
+ * If != -1, this field specifies the total number of available slots on the cluster
+ * conntected to the client.
+ */
+ private int maxSlots = -1;
+
+ /**
+ * ID of the last job submitted with this client.
+ */
+ private JobID lastJobId = null;
+
+ private ClassLoader userCodeClassLoader; // TODO: use userCodeClassloader to deserialize accumulator results.
// ------------------------------------------------------------------------
// Construction
@@ -80,7 +95,7 @@ public class Client {
*
* @param jobManagerAddress Address and port of the job-manager.
*/
- public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader) {
+ public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader, int maxSlots) {
Preconditions.checkNotNull(config, "Configuration is null");
this.configuration = config;
@@ -88,7 +103,9 @@ public class Client {
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
- this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator());
+ this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+ this.userCodeClassLoader = userCodeClassLoader;
+ this.maxSlots = maxSlots;
}
/**
@@ -112,7 +129,8 @@ public class Client {
throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
}
- this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator());
+ this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
+ this.userCodeClassLoader = userCodeClassLoader;
}
public void setPrintStatusDuringExecution(boolean print) {
@@ -126,6 +144,14 @@ public class Client {
public int getJobManagerPort() {
return this.configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
}
+
+ /**
+ * @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
+ * connected to this client.
+ */
+ public int getMaxSlots() {
+ return this.maxSlots;
+ }
// ------------------------------------------------------------------------
// Compilation and Submission
@@ -191,8 +217,10 @@ public class Client {
public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
+ LOG.debug("Changing plan default parallelism from {} to {}",p.getDefaultParallelism(), parallelism);
p.setDefaultParallelism(parallelism);
}
+ LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());
return this.compiler.compile(p);
}
@@ -230,49 +258,31 @@ public class Client {
return job;
}
- public JobExecutionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
+ public JobSubmissionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
return run(prog.getPlanWithJars(), parallelism, wait);
}
else if (prog.isUsingInteractiveMode()) {
-
- ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism);
+ LOG.info("Starting program in interactive mode");
+ ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
ContextEnvironment.enableLocalExecution(false);
- if (wait) {
- // invoke here
- try {
- prog.invokeInteractiveModeForExecution();
- }
- finally {
- ContextEnvironment.enableLocalExecution(true);
- }
+ // invoke here
+ try {
+ prog.invokeInteractiveModeForExecution();
}
- else {
- // invoke in the background
- Thread backGroundRunner = new Thread("Program Runner") {
- public void run() {
- try {
- prog.invokeInteractiveModeForExecution();
- }
- catch (Throwable t) {
- LOG.error("The program execution failed.", t);
- }
- finally {
- ContextEnvironment.enableLocalExecution(true);
- }
- }
- };
- backGroundRunner.start();
+ finally {
+ ContextEnvironment.enableLocalExecution(true);
}
- return null;
+
+ return new JobSubmissionResult(lastJobId);
}
else {
throw new RuntimeException();
}
}
- public JobExecutionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
+ public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
return run(optimizedPlan, prog.getAllLibraries(), wait);
}
@@ -291,17 +301,18 @@ public class Client {
* i.e. the job-manager is unreachable, or due to the fact that the
* parallel execution failed.
*/
- public JobExecutionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
+ public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
}
- public JobExecutionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
+ public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
JobGraph job = getJobGraph(compiledPlan, libraries);
+ this.lastJobId = job.getJobID();
return run(job, wait);
}
- public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
+ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
final String hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
if (hostname == null) {
@@ -335,6 +346,8 @@ public class Client {
}
else {
JobClient.submitJobDetached(jobGraph, client, timeout);
+ // return a "Fake" execution result with the JobId
+ return new JobSubmissionResult(jobGraph.getJobID());
}
}
catch (JobExecutionException e) {
@@ -347,8 +360,6 @@ public class Client {
actorSystem.shutdown();
actorSystem.awaitTermination();
}
-
- return new JobExecutionResult(-1, null);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 8d5fe17..55b579a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,37 +22,51 @@ import java.io.File;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- *
+ * Execution Environment for remote execution with the Client.
*/
public class ContextEnvironment extends ExecutionEnvironment {
-
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class);
+
private final Client client;
private final List<File> jarFilesToAttach;
private final ClassLoader userCodeClassLoader;
+
+ private final boolean wait;
- public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader) {
+ public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) {
this.client = remoteConnection;
this.jarFilesToAttach = jarFiles;
this.userCodeClassLoader = userCodeClassLoader;
+ this.wait = wait;
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);
-
- return this.client.run(toRun, getParallelism(), true);
+
+ JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
+ if(result instanceof JobExecutionResult) {
+ return (JobExecutionResult) result;
+ } else {
+ LOG.warn("The Client didn't return a JobExecutionResult");
+ return new JobExecutionResult(result.getJobID(), -1, null);
+ }
}
@Override
@@ -60,7 +74,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
Plan p = createProgramPlan("unnamed job");
OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism());
-
+
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
}
@@ -83,15 +97,15 @@ public class ContextEnvironment extends ExecutionEnvironment {
// --------------------------------------------------------------------------------------------
static void setAsContext(Client client, List<File> jarFilesToAttach,
- ClassLoader userCodeClassLoader, int defaultParallelism)
+ ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
{
- initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism));
+ initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait));
}
protected static void enableLocalExecution(boolean enabled) {
ExecutionEnvironment.enableLocalExecution(enabled);
}
-
+
// --------------------------------------------------------------------------------------------
public static class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
@@ -103,20 +117,23 @@ public class ContextEnvironment extends ExecutionEnvironment {
private final ClassLoader userCodeClassLoader;
private final int defaultParallelism;
+
+ private final boolean wait;
public ContextEnvironmentFactory(Client client, List<File> jarFilesToAttach,
- ClassLoader userCodeClassLoader, int defaultParallelism)
+ ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait)
{
this.client = client;
this.jarFilesToAttach = jarFilesToAttach;
this.userCodeClassLoader = userCodeClassLoader;
this.defaultParallelism = defaultParallelism;
+ this.wait = wait;
}
@Override
public ExecutionEnvironment createExecutionEnvironment() {
- ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader);
+ ContextEnvironment env = new ContextEnvironment(client, jarFilesToAttach, userCodeClassLoader, wait);
if (defaultParallelism > 0) {
env.setParallelism(defaultParallelism);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index 6d54b58..cb2585d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -105,7 +105,7 @@ public class CliFrontendInfoTest {
}
@Override
- protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName)
+ protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName, int par)
throws Exception
{
return new TestClient(expectedDop);
@@ -118,7 +118,7 @@ public class CliFrontendInfoTest {
private TestClient(int expectedDop) throws Exception {
super(new InetSocketAddress(InetAddress.getLocalHost(), 6176),
- new Configuration(), CliFrontendInfoTest.class.getClassLoader());
+ new Configuration(), CliFrontendInfoTest.class.getClassLoader(), -1);
this.expectedDop = expectedDop;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 2712c19..3224e0f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -22,7 +22,7 @@ import akka.actor.*;
import akka.testkit.JavaTestKit;
import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.junit.AfterClass;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 8ed99f9..034ee4e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -93,7 +93,7 @@ public class CliFrontendRunTest {
}
@Override
- protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
+ protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
assertEquals(this.expectedParallelim, parallelism);
return 0;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 09a3e1a..be6c19a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -91,7 +91,7 @@ public class ClientConnectionTest {
vertex.setInvokableClass(TestInvokable.class);
final JobGraph jg = new JobGraph("Test Job", vertex);
- final Client client = new Client(unreachableEndpoint, config, getClass().getClassLoader());
+ final Client client = new Client(unreachableEndpoint, config, getClass().getClassLoader(), -1);
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 9278c7a..22865ed 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -24,6 +24,7 @@ import akka.actor.Status;
import akka.actor.UntypedActor;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.DataStatistics;
@@ -35,7 +36,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.net.NetUtils;
import org.junit.After;
@@ -101,7 +102,7 @@ public class ClientTest {
when(program.getPlanWithJars()).thenReturn(planWithJarsMock);
when(planWithJarsMock.getPlan()).thenReturn(planMock);
- whenNew(Optimizer.class).withArguments(any(DataStatistics.class), any(CostEstimator.class)).thenReturn(this.compilerMock);
+ whenNew(Optimizer.class).withArguments(any(DataStatistics.class), any(CostEstimator.class), any(Configuration.class)).thenReturn(this.compilerMock);
when(compilerMock.compile(planMock)).thenReturn(optimizedPlanMock);
whenNew(JobGraphGenerator.class).withNoArguments().thenReturn(generatorMock);
@@ -139,11 +140,9 @@ public class ClientTest {
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME());
Client out = new Client(config, getClass().getClassLoader());
- JobExecutionResult result = out.run(program.getPlanWithJars(), -1, false);
+ JobSubmissionResult result = out.run(program.getPlanWithJars(), -1, false);
assertNotNull(result);
- assertEquals(-1, result.getNetRuntime());
- assertNull(result.getAllAccumulatorResults());
program.deleteExtractedLibraries();
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index e2b7935..f156f77 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -71,7 +71,8 @@ public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
env.getExecutionPlan();
env.createProgramPlan();
} catch (Exception e) {
- fail("Cannot run both #getExecutionPlan and #execute.");
+ e.printStackTrace();
+ fail("Cannot run both #getExecutionPlan and #execute. Message: "+e.getMessage());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index 8aecff3..67b406d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -44,7 +44,7 @@ public class ExecutionPlanCreationTest {
InetAddress mockAddress = InetAddress.getLocalHost();
InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
- Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader());
+ Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader(), -1);
OptimizedPlan op = (OptimizedPlan) client.getOptimizedPlan(prg, -1);
assertNotNull(op);
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index 406cfe9..68506ae 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -24,7 +24,7 @@ import java.util.Map;
* The result of a job execution. Gives access to the execution time of the job,
* and to all accumulators created by this job.
*/
-public class JobExecutionResult {
+public class JobExecutionResult extends JobSubmissionResult {
private long netRuntime;
private Map<String, Object> accumulatorResults;
@@ -32,10 +32,12 @@ public class JobExecutionResult {
/**
* Creates a new JobExecutionResult.
*
+ * @param jobID
* @param netRuntime The net runtime of the job (excluding pre-flight phase like the optimizer)
* @param accumulators A map of all accumulators produced by the job.
*/
- public JobExecutionResult(long netRuntime, Map<String, Object> accumulators) {
+ public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulators) {
+ super(jobID);
this.netRuntime = netRuntime;
this.accumulatorResults = accumulators;
}
@@ -92,5 +94,6 @@ public class JobExecutionResult {
return (Integer) result;
}
+
// TODO Create convenience methods for the other shipped accumulator types
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
new file mode 100644
index 0000000..7478da4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import javax.xml.bind.DatatypeConverter;
+import org.apache.flink.util.AbstractID;
+import java.nio.ByteBuffer;
+
+/**
+ * Unique Job Identifier
+ */
+public final class JobID extends AbstractID {
+
+ private static final long serialVersionUID = 1L;
+
+ public JobID() {
+ super();
+ }
+
+ public JobID(long lowerPart, long upperPart) {
+ super(lowerPart, upperPart);
+ }
+
+ public JobID(byte[] bytes) {
+ super(bytes);
+ }
+
+ public static JobID generate() {
+ return new JobID();
+ }
+
+ public static JobID fromByteArray(byte[] bytes) {
+ return new JobID(bytes);
+ }
+
+ public static JobID fromByteBuffer(ByteBuffer buf) {
+ long lower = buf.getLong();
+ long upper = buf.getLong();
+ return new JobID(lower, upper);
+ }
+
+ public static JobID fromHexString(String hexString) {
+ return new JobID(DatatypeConverter.parseHexBinary(hexString));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
new file mode 100644
index 0000000..5cea9d5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+/**
+ * The result of a job submission.
+ * Contains the JobID
+ */
+public class JobSubmissionResult {
+ private JobID jobID;
+
+ public JobSubmissionResult(JobID jobID) {
+ this.jobID = jobID;
+ }
+
+ /**
+ * Returns the JobID assigned to the job by the Flink runtime.
+ *
+ * @return jobID, or null if the job has been executed on a runtime without JobIDs or if the execution failed.
+ */
+ public JobID getJobID() {
+ return jobID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 78ad930..f605113 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -100,7 +100,7 @@ public class CollectionExecutor {
long endTime = System.currentTimeMillis();
Map<String, Object> accumulatorResults = AccumulatorHelper.toResultMap(accumulators);
- return new JobExecutionResult(endTime - startTime, accumulatorResults);
+ return new JobExecutionResult(null, endTime - startTime, accumulatorResults);
}
private List<?> execute(Operator<?> operator) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
index 34ebbff..9c56e61 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
@@ -29,6 +29,7 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m
# Log output from org.apache.flink.yarn to the console. This is used by the
# CliFrontend class when using a per-job YARN cluster.
log4j.logger.org.apache.flink.yarn=INFO, console
+log4j.logger.org.apache.flink.client.FlinkYarnSessionCli=INFO, console
log4j.logger.org.apache.hadoop=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index 07aa2ce..bfd5e85 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -54,7 +54,6 @@ public class WordCount {
public static void main(String[] args) throws Exception {
-
if(!parseParameters(args)) {
return;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/pom.xml
----------------------------------------------------------------------
diff --git a/flink-optimizer/pom.xml b/flink-optimizer/pom.xml
index 55764e9..4d37aea 100644
--- a/flink-optimizer/pom.xml
+++ b/flink-optimizer/pom.xml
@@ -52,7 +52,7 @@ under the License.
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
-
+
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -60,4 +60,21 @@ under the License.
</dependency>
</dependencies>
+ <!-- Because flink-tests needs the CompilerTestBsae -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index c80cfc2..c4d70f6 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.traversals.BinaryUnionReplacer;
import org.apache.flink.optimizer.traversals.BranchesVisitor;
import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
@@ -45,7 +46,6 @@ import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.postpass.OptimizerPostPass;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.util.InstantiationUtil;
/**
@@ -303,8 +303,8 @@ public class Optimizer {
* unknown sizes and hence use only the heuristic cost functions, which result in the selection
* of the most robust execution strategies.
*/
- public Optimizer() {
- this(null, new DefaultCostEstimator());
+ public Optimizer(Configuration config) {
+ this(null, new DefaultCostEstimator(), config);
}
/**
@@ -314,8 +314,8 @@ public class Optimizer {
* @param stats
* The statistics to be used to determine the input properties.
*/
- public Optimizer(DataStatistics stats) {
- this(stats, new DefaultCostEstimator());
+ public Optimizer(DataStatistics stats, Configuration config) {
+ this(stats, new DefaultCostEstimator(), config);
}
/**
@@ -328,8 +328,8 @@ public class Optimizer {
*
* @param estimator The cost estimator to use to cost the individual operations.
*/
- public Optimizer(CostEstimator estimator) {
- this(null, estimator);
+ public Optimizer(CostEstimator estimator, Configuration config) {
+ this(null, estimator, config);
}
/**
@@ -343,17 +343,17 @@ public class Optimizer {
* @param estimator
* The <tt>CostEstimator</tt> to use to cost the individual operations.
*/
- public Optimizer(DataStatistics stats, CostEstimator estimator) {
+ public Optimizer(DataStatistics stats, CostEstimator estimator, Configuration config) {
this.statistics = stats;
this.costEstimator = estimator;
// determine the default parallelism
// check for old key string first, then for new one
- this.defaultParallelism = GlobalConfiguration.getInteger(
+ this.defaultParallelism = config.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
ConfigConstants.DEFAULT_PARALLELISM);
// now check for new one which overwrites old values
- this.defaultParallelism = GlobalConfiguration.getInteger(
+ this.defaultParallelism = config.getInteger(
ConfigConstants.DEFAULT_PARALLELISM_KEY,
this.defaultParallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
index 1e4bafb..0c50536 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.optimizer.util.DummyCrossStub;
import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 2df08a0..94ff41a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
index c7ad2da..57c53ff 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
index 47efeb1..1a4cd18 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
@@ -22,6 +22,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
index eba07f1..61d407a 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
deleted file mode 100644
index 4eed236..0000000
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/CompilerTestBase.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.util.OperatingSystem;
-import org.apache.flink.util.Visitor;
-import org.junit.Before;
-
-/**
- * Base class for Optimizer tests. Offers utility methods to trigger optimization
- * of a program and to fetch the nodes in an optimizer plan that correspond
- * the the node in the program plan.
- */
-public abstract class CompilerTestBase implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random";
-
- protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null";
-
- protected static final int DEFAULT_PARALLELISM = 8;
-
- private static final String CACHE_KEY = "cachekey";
-
- // ------------------------------------------------------------------------
-
- protected transient DataStatistics dataStats;
-
- protected transient Optimizer withStatsCompiler;
-
- protected transient Optimizer noStatsCompiler;
-
- private transient int statCounter;
-
- // ------------------------------------------------------------------------
-
- @Before
- public void setup() {
- this.dataStats = new DataStatistics();
- this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
- this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
-
- this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
- this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
- }
-
- // ------------------------------------------------------------------------
-
- public OptimizedPlan compileWithStats(Plan p) {
- return this.withStatsCompiler.compile(p);
- }
-
- public OptimizedPlan compileNoStats(Plan p) {
- return this.noStatsCompiler.compile(p);
- }
-
- public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) {
- setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
- }
-
- public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) {
- final String key = CACHE_KEY + this.statCounter++;
- this.dataStats.cacheBaseStatistics(stats, key);
- source.setStatisticsKey(key);
- }
-
- // ------------------------------------------------------------------------
-
- public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
- return new OptimizerPlanNodeResolver(plan);
- }
-
- public static final class OptimizerPlanNodeResolver {
-
- private final Map<String, ArrayList<PlanNode>> map;
-
- OptimizerPlanNodeResolver(OptimizedPlan p) {
- HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>();
-
- for (PlanNode n : p.getAllNodes()) {
- Operator<?> c = n.getOriginalOptimizerNode().getOperator();
- String name = c.getName();
-
- ArrayList<PlanNode> list = map.get(name);
- if (list == null) {
- list = new ArrayList<PlanNode>(2);
- map.put(name, list);
- }
-
- // check whether this node is a child of a node with the same contract (aka combiner)
- boolean shouldAdd = true;
- for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) {
- PlanNode in = iter.next();
- if (in.getOriginalOptimizerNode().getOperator() == c) {
- // is this the child or is our node the child
- if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) {
- SingleInputPlanNode thisNode = (SingleInputPlanNode) n;
- SingleInputPlanNode otherNode = (SingleInputPlanNode) in;
-
- if (thisNode.getPredecessor() == otherNode) {
- // other node is child, remove it
- iter.remove();
- } else if (otherNode.getPredecessor() == thisNode) {
- shouldAdd = false;
- }
- } else {
- throw new RuntimeException("Unrecodnized case in test.");
- }
- }
- }
-
- if (shouldAdd) {
- list.add(n);
- }
- }
-
- this.map = map;
- }
-
-
- @SuppressWarnings("unchecked")
- public <T extends PlanNode> T getNode(String name) {
- List<PlanNode> nodes = this.map.get(name);
- if (nodes == null || nodes.isEmpty()) {
- throw new RuntimeException("No node found with the given name.");
- } else if (nodes.size() != 1) {
- throw new RuntimeException("Multiple nodes found with the given name.");
- } else {
- return (T) nodes.get(0);
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T extends PlanNode> T getNode(String name, Class<? extends Function> stubClass) {
- List<PlanNode> nodes = this.map.get(name);
- if (nodes == null || nodes.isEmpty()) {
- throw new RuntimeException("No node found with the given name and stub class.");
- } else {
- PlanNode found = null;
- for (PlanNode node : nodes) {
- if (node.getClass() == stubClass) {
- if (found == null) {
- found = node;
- } else {
- throw new RuntimeException("Multiple nodes found with the given name and stub class.");
- }
- }
- }
- if (found == null) {
- throw new RuntimeException("No node found with the given name and stub class.");
- } else {
- return (T) found;
- }
- }
- }
-
- public List<PlanNode> getNodes(String name) {
- List<PlanNode> nodes = this.map.get(name);
- if (nodes == null || nodes.isEmpty()) {
- throw new RuntimeException("No node found with the given name.");
- } else {
- return new ArrayList<PlanNode>(nodes);
- }
- }
- }
-
- /**
- * Collects all DataSources of a plan to add statistics
- *
- */
- public static class SourceCollectorVisitor implements Visitor<Operator<?>> {
-
- protected final List<GenericDataSourceBase<?, ?>> sources = new ArrayList<GenericDataSourceBase<?, ?>>(4);
-
- @Override
- public boolean preVisit(Operator<?> visitable) {
-
- if(visitable instanceof GenericDataSourceBase) {
- sources.add((GenericDataSourceBase<?, ?>) visitable);
- }
- else if(visitable instanceof BulkIterationBase) {
- ((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
- }
-
- return true;
- }
-
- @Override
- public void postVisit(Operator<?> visitable) {}
-
- public List<GenericDataSourceBase<?, ?>> getSources() {
- return this.sources;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
index aaee975..bb3aa47 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DisjointDataFlowsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 3b7eae7..7865861 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
index 77d185d..76b3b0e 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/GroupOrderTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.optimizer.util.IdentityReduce;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
index 6dadc19..52e9a2d 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/HardPlansCompilationTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.optimizer.util.DummyInputFormat;
import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.optimizer.util.IdentityMap;
import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.types.IntValue;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
index 810ec0e..0afbe93 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/IterationsCompilerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer;
import static org.junit.Assert.*;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
index e65758f..34fc085 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/NestedIterationsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@SuppressWarnings({"serial", "unchecked"})
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
index a54136a..8236f10 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ParallelismChangeTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.optimizer.util.DummyMatchStub;
import org.apache.flink.optimizer.util.DummyOutputFormat;
import org.apache.flink.optimizer.util.IdentityMap;
import org.apache.flink.optimizer.util.IdentityReduce;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Assert;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.operators.FileDataSink;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
index 2b42f85..72effc1 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitionPushdownTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
@SuppressWarnings("serial")
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index 16684dc..f42eb02 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.util.Collector;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
index 31f71d1..84f6377 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PipelineBreakerTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.*;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;