You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 06:06:45 UTC
[1/3] flink git commit: [FLINK-6716] Suppress load errors in
checkstyle JavadocMethod
Repository: flink
Updated Branches:
refs/heads/master 6771a4d18 -> c793ea41d
[FLINK-6716] Suppress load errors in checkstyle JavadocMethod
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db397b86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db397b86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db397b86
Branch: refs/heads/master
Commit: db397b86907093acbf43877ab9c58cfc2b9a1090
Parents: 6771a4d
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Thu May 25 10:08:54 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 26 08:03:13 2017 +0200
----------------------------------------------------------------------
tools/maven/strict-checkstyle.xml | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db397b86/tools/maven/strict-checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/strict-checkstyle.xml b/tools/maven/strict-checkstyle.xml
index 931ec1b..2dd4881 100644
--- a/tools/maven/strict-checkstyle.xml
+++ b/tools/maven/strict-checkstyle.xml
@@ -262,6 +262,8 @@ This file is based on the checkstyle file of Apache Beam.
<property name="allowMissingThrowsTags" value="true"/>
<property name="allowThrowsTagsForSubclasses" value="true"/>
<property name="allowUndeclaredRTE" value="true"/>
+ <!-- This check sometimes failed for with "Unable to get class information for @throws tag" for custom exceptions -->
+ <property name="suppressLoadErrors" value="true"/>
</module>
<!-- Check that paragraph tags are used correctly in Javadoc. -->
[3/3] flink git commit: [FLINK-6719] Activate strict checkstyle for
flink-clients
Posted by ch...@apache.org.
[FLINK-6719] Activate strict checkstyle for flink-clients
This closes #3989.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c793ea41
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c793ea41
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c793ea41
Branch: refs/heads/master
Commit: c793ea41d88fe84fa97d825728ad95f35e27ef82
Parents: db397b8
Author: zentol <ch...@apache.org>
Authored: Thu May 25 13:02:36 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 26 08:03:14 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 52 ++++---
.../org/apache/flink/client/ClientUtils.java | 3 +-
.../org/apache/flink/client/LocalExecutor.java | 47 ++++---
.../org/apache/flink/client/RemoteExecutor.java | 23 ++--
.../apache/flink/client/cli/CancelOptions.java | 3 +-
.../flink/client/cli/CliArgsException.java | 3 +-
.../flink/client/cli/CliFrontendParser.java | 12 +-
.../flink/client/cli/CommandLineOptions.java | 3 +-
.../flink/client/cli/CustomCommandLine.java | 15 ++-
.../org/apache/flink/client/cli/DefaultCLI.java | 7 +-
.../apache/flink/client/cli/InfoOptions.java | 3 +-
.../apache/flink/client/cli/ListOptions.java | 5 +-
.../apache/flink/client/cli/ProgramOptions.java | 4 +-
.../org/apache/flink/client/cli/RunOptions.java | 3 +-
.../flink/client/cli/SavepointOptions.java | 3 +-
.../apache/flink/client/cli/StopOptions.java | 3 +-
.../client/deployment/ClusterDescriptor.java | 5 +-
.../deployment/StandaloneClusterDescriptor.java | 4 +-
.../flink/client/program/ClusterClient.java | 88 ++++++------
.../client/program/ContextEnvironment.java | 12 +-
.../program/ContextEnvironmentFactory.java | 9 +-
.../client/program/DetachedEnvironment.java | 4 +
.../flink/client/program/JobWithJars.java | 25 ++--
.../program/OptimizerPlanEnvironment.java | 15 ++-
.../flink/client/program/PackagedProgram.java | 135 +++++++++----------
.../client/program/PreviewPlanEnvironment.java | 5 +-
.../program/ProgramInvocationException.java | 7 +-
.../client/program/StandaloneClusterClient.java | 7 +-
.../CliFrontendAddressConfigurationTest.java | 2 +-
.../flink/client/CliFrontendInfoTest.java | 14 +-
.../flink/client/CliFrontendListCancelTest.java | 31 +++--
.../client/CliFrontendPackageProgramTest.java | 52 +++----
.../apache/flink/client/CliFrontendRunTest.java | 12 +-
.../flink/client/CliFrontendSavepointTest.java | 15 ++-
.../flink/client/CliFrontendStopTest.java | 20 ++-
.../flink/client/CliFrontendTestUtils.java | 33 +++--
.../RemoteExecutorHostnameResolutionTest.java | 6 +-
.../client/program/ClientConnectionTest.java | 13 +-
.../apache/flink/client/program/ClientTest.java | 72 ++++++----
.../flink/client/program/ClusterClientTest.java | 6 +-
.../ExecutionPlanAfterExecutionTest.java | 25 ++--
.../program/ExecutionPlanCreationTest.java | 44 +++---
...rRetrievalServiceHostnameResolutionTest.java | 5 +-
.../client/program/PackagedProgramTest.java | 15 ++-
.../testjar/JobWithExternalDependency.java | 5 +-
.../apache/flink/client/testjar/WordCount.java | 55 ++++----
46 files changed, 503 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 62fa402..a22cb37 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -18,7 +18,6 @@
package org.apache.flink.client;
-import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -74,12 +73,10 @@ import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+
+import org.apache.commons.cli.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.io.FileNotFoundException;
@@ -100,6 +97,11 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
@@ -139,8 +141,6 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
-
-
private final Configuration config;
private final FiniteDuration clientTimeout;
@@ -173,13 +173,12 @@ public class CliFrontend {
this.clientTimeout = AkkaUtils.getClientTimeout(config);
}
-
// --------------------------------------------------------------------------------------------
// Getter & Setter
// --------------------------------------------------------------------------------------------
/**
- * Getter which returns a copy of the associated configuration
+ * Getter which returns a copy of the associated configuration.
*
* @return Copy of the associated configuration
*/
@@ -191,14 +190,13 @@ public class CliFrontend {
return copiedConfiguration;
}
-
// --------------------------------------------------------------------------------------------
// Execute Actions
// --------------------------------------------------------------------------------------------
/**
* Executions the run action.
- *
+ *
* @param args Command line arguments for the run action.
*/
protected int run(String[] args) {
@@ -251,7 +249,7 @@ public class CliFrontend {
LOG.debug("User parallelism is set to {}", userParallelism);
if (client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("
- + client.getMaxSlots()+"). "
+ + client.getMaxSlots() + "). "
+ "To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}
@@ -277,7 +275,7 @@ public class CliFrontend {
/**
* Executes the info action.
- *
+ *
* @param args Command line arguments for the info action.
*/
protected int info(String[] args) {
@@ -323,7 +321,7 @@ public class CliFrontend {
Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism);
-
+
String jsonPlan = null;
if (flinkPlan instanceof OptimizedPlan) {
jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
@@ -361,7 +359,7 @@ public class CliFrontend {
/**
* Executes the list action.
- *
+ *
* @param args Command line arguments for the list action.
*/
protected int list(String[] args) {
@@ -437,12 +435,12 @@ public class CliFrontend {
Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){
@Override
public int compare(JobStatusMessage o1, JobStatusMessage o2) {
- return (int)(o1.getStartTime()-o2.getStartTime());
+ return (int) (o1.getStartTime() - o2.getStartTime());
}
};
if (running) {
- if(runningJobs.size() == 0) {
+ if (runningJobs.size() == 0) {
System.out.println("No running jobs.");
}
else {
@@ -464,7 +462,7 @@ public class CliFrontend {
Collections.sort(scheduledJobs, njec);
System.out.println("----------------------- Scheduled Jobs -----------------------");
- for(JobStatusMessage rj : scheduledJobs) {
+ for (JobStatusMessage rj : scheduledJobs) {
System.out.println(df.format(new Date(rj.getStartTime()))
+ " : " + rj.getJobId() + " : " + rj.getJobName());
}
@@ -485,7 +483,7 @@ public class CliFrontend {
/**
* Executes the STOP action.
- *
+ *
* @param args Command line arguments for the stop action.
*/
protected int stop(String[] args) {
@@ -544,7 +542,7 @@ public class CliFrontend {
/**
* Executes the CANCEL action.
- *
+ *
* @param args Command line arguments for the cancel action.
*/
protected int cancel(String[] args) {
@@ -877,8 +875,7 @@ public class CliFrontend {
* @throws org.apache.flink.client.program.ProgramInvocationException
*/
protected PackagedProgram buildProgram(ProgramOptions options)
- throws FileNotFoundException, ProgramInvocationException
- {
+ throws FileNotFoundException, ProgramInvocationException {
String[] programArgs = options.getProgramArgs();
String jarFilePath = options.getJarFilePath();
List<URL> classpaths = options.getClasspaths();
@@ -910,7 +907,7 @@ public class CliFrontend {
}
/**
- * Updates the associated configuration with the given command line options
+ * Updates the associated configuration with the given command line options.
*
* @param options Command line options
*/
@@ -1023,7 +1020,7 @@ public class CliFrontend {
/**
* Displays an exception message.
- *
+ *
* @param t The exception to display.
* @return The return code for the process.
*/
@@ -1061,7 +1058,7 @@ public class CliFrontend {
/**
* Parses the command line arguments and starts the requested action.
- *
+ *
* @param args command line arguments of the client.
* @return The return code of the program
*/
@@ -1118,7 +1115,7 @@ public class CliFrontend {
}
/**
- * Submits the job based on the arguments
+ * Submits the job based on the arguments.
*/
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
@@ -1172,9 +1169,8 @@ public class CliFrontend {
return location;
}
-
/**
- * Writes the given job manager address to the associated configuration object
+ * Writes the given job manager address to the associated configuration object.
*
* @param address Address to write to the configuration
* @param config The config to write to
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index f1ed93e..03f2f8e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client;
import java.net.InetSocketAddress;
@@ -22,7 +23,7 @@ import java.net.URI;
import java.net.URISyntaxException;
/**
- * A class that provides some utility methods
+ * A class that provides some utility methods.
*/
public class ClientUtils {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index 20a3366..abd35fc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -18,8 +18,6 @@
package org.apache.flink.client;
-import java.util.List;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
@@ -28,18 +26,20 @@ import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import java.util.List;
+
/**
* A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance.
*
@@ -50,24 +50,24 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
* then this executor needs to be explicitly started, to keep running across several executions.</p>
*/
public class LocalExecutor extends PlanExecutor {
-
+
private static final boolean DEFAULT_OVERWRITE = false;
private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
- /** we lock to ensure singleton execution */
+ /** we lock to ensure singleton execution. */
private final Object lock = new Object();
- /** The mini cluster on which to execute the local programs */
+ /** The mini cluster on which to execute the local programs. */
private LocalFlinkMiniCluster flink;
- /** Custom user configuration for the execution */
+ /** Custom user configuration for the execution. */
private final Configuration configuration;
- /** Config value for how many slots to provide in the local cluster */
+ /** Config value for how many slots to provide in the local cluster. */
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
- /** Config flag whether to overwrite existing files by default */
+ /** Config flag whether to overwrite existing files by default. */
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
// ------------------------------------------------------------------------
@@ -93,7 +93,7 @@ public class LocalExecutor extends PlanExecutor {
}
public void setTaskManagerNumSlots(int taskManagerNumSlots) {
- this.taskManagerNumSlots = taskManagerNumSlots;
+ this.taskManagerNumSlots = taskManagerNumSlots;
}
public int getTaskManagerNumSlots() {
@@ -119,7 +119,7 @@ public class LocalExecutor extends PlanExecutor {
}
}
}
-
+
@Override
public void stop() throws Exception {
synchronized (lock) {
@@ -139,14 +139,14 @@ public class LocalExecutor extends PlanExecutor {
/**
* Executes the given program on a local runtime and waits for the job to finish.
- *
+ *
* <p>If the executor has not been started before, this starts the executor and shuts it down
* after the job finished. If the job runs in session mode, the executor is kept alive until
* no more references to the executor exist.</p>
- *
+ *
* @param plan The plan of the program to execute.
* @return The net runtime of the program, in milliseconds.
- *
+ *
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
@@ -236,18 +236,17 @@ public class LocalExecutor extends PlanExecutor {
return configuration;
}
-
// --------------------------------------------------------------------------------------------
// Static variants that internally bring up an instance and shut it down after the execution
// --------------------------------------------------------------------------------------------
/**
* Executes the given program.
- *
+ *
* @param pa The program.
* @param args The parameters.
* @return The execution result of the program.
- *
+ *
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
@@ -257,10 +256,10 @@ public class LocalExecutor extends PlanExecutor {
/**
* Executes the given dataflow plan.
- *
- * @param plan The dataflow plan.
+ *
+ * @param plan The dataflow plan.
* @return The execution result.
- *
+ *
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
@@ -270,7 +269,7 @@ public class LocalExecutor extends PlanExecutor {
/**
* Creates a JSON representation of the given dataflow's execution plan.
- *
+ *
* @param plan The dataflow plan.
* @return The dataflow's execution plan, as a JSON string.
* @throws Exception Thrown, if the optimization process that creates the execution plan failed.
@@ -287,7 +286,7 @@ public class LocalExecutor extends PlanExecutor {
/**
* Creates a JSON representation of the given dataflow plan.
- *
+ *
* @param plan The dataflow plan.
* @return The dataflow plan (prior to optimization) as a JSON string.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 86b36b3..4a3cc74 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -18,11 +18,6 @@
package org.apache.flink.client;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
@@ -30,21 +25,26 @@ import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
/**
* The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program
* and ships it to a remote Flink cluster for execution.
- *
+ *
* <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the
* set of libraries that need to be shipped together with the program.</p>
- *
+ *
* <p>The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to
* remotely execute program parts.</p>
*/
@@ -62,7 +62,6 @@ public class RemoteExecutor extends PlanExecutor {
private int defaultParallelism = 1;
-
public RemoteExecutor(String hostname, int port) {
this(hostname, port, new Configuration(), Collections.<URL>emptyList(),
Collections.<URL>emptyList());
@@ -109,7 +108,6 @@ public class RemoteExecutor extends PlanExecutor {
this.jarFiles = jarFiles;
this.globalClasspaths = globalClasspaths;
-
clientConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, inet.getHostName());
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort());
}
@@ -134,7 +132,7 @@ public class RemoteExecutor extends PlanExecutor {
/**
* Gets the parallelism that will be used when neither the program does not define
* any parallelism at all.
- *
+ *
* @return The default parallelism for the executor.
*/
public int getDefaultParallelism() {
@@ -145,7 +143,6 @@ public class RemoteExecutor extends PlanExecutor {
// Startup & Shutdown
// ------------------------------------------------------------------------
-
@Override
public void start() throws Exception {
synchronized (lock) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
index 54e1a23..62cede2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
@@ -22,7 +23,7 @@ import org.apache.commons.cli.CommandLine;
import static org.apache.flink.client.cli.CliFrontendParser.CANCEL_WITH_SAVEPOINT_OPTION;
/**
- * Command line options for the CANCEL command
+ * Command line options for the CANCEL command.
*/
public class CancelOptions extends CommandLineOptions {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
index 932c66d..027be07 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
/**
@@ -27,4 +28,4 @@ public class CliArgsException extends Exception {
public CliArgsException(String message) {
super(message);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index f1766b0..9e54ab7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -15,20 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.configuration.ConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A simple command line parser (based on Apache Commons CLI) that extracts command
* line options.
@@ -185,7 +186,6 @@ public class CliFrontendParser {
return addCustomCliOptions(options, true);
}
-
private static Options getJobManagerAddressOption(Options options) {
options.addOption(ADDRESS_OPTION);
return options;
@@ -374,7 +374,7 @@ public class CliFrontendParser {
}
/**
- * Adds custom cli options
+ * Adds custom cli options.
* @param options The options to add options to
* @param runOptions Whether to include run options
* @return Options with additions
@@ -390,7 +390,7 @@ public class CliFrontendParser {
}
/**
- * Prints custom cli options
+ * Prints custom cli options.
* @param formatter The formatter to use for printing
* @param runOptions True if the run options should be printed, False to print only general options
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
index f6f6319..a9a29b2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
@@ -20,8 +20,8 @@ package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
-import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
/**
* Base class for all options parsed from the command line.
@@ -35,7 +35,6 @@ public abstract class CommandLineOptions {
private final boolean printHelp;
-
protected CommandLineOptions(CommandLine line) {
this.commandLine = line;
this.printHelp = line.hasOption(HELP_OPTION.getOpt());
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index a4cb479..9ddaf9e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -15,24 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+
import java.net.URL;
import java.util.List;
-
/**
* Custom command-line interface to load hooks for the command-line interface.
*/
public interface CustomCommandLine<ClusterType extends ClusterClient> {
/**
- * Signals whether the custom command-line wants to execute or not
+ * Signals whether the custom command-line wants to execute or not.
* @param commandLine The command-line options
* @param configuration The Flink configuration
* @return True if the command-line wants to run, False otherwise
@@ -40,7 +41,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
boolean isActive(CommandLine commandLine, Configuration configuration);
/**
- * Gets the unique identifier of this CustomCommandLine
+ * Gets the unique identifier of this CustomCommandLine.
* @return A unique identifier
*/
String getId();
@@ -58,7 +59,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
void addGeneralOptions(Options baseOptions);
/**
- * Retrieves a client for a running cluster
+ * Retrieves a client for a running cluster.
* @param commandLine The command-line parameters from the CliFrontend
* @param config The Flink config
* @return Client if a cluster could be retrieved
@@ -69,7 +70,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
Configuration config) throws UnsupportedOperationException;
/**
- * Creates the client for the cluster
+ * Creates the client for the cluster.
* @param applicationName The application name to use
* @param commandLine The command-line options parsed by the CliFrontend
* @param config The Flink config to use
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index e94c2f9..49e9752 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -15,10 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.client.cli;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
+package org.apache.flink.client.cli;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
@@ -26,6 +24,9 @@ import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
index 83f5c38..559ce94 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
/**
- * Command line options for the INFO command
+ * Command line options for the INFO command.
*/
public class InfoOptions extends ProgramOptions {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
index 45f39a4..7ae00cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
@@ -23,7 +24,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.RUNNING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SCHEDULED_OPTION;
/**
- * Command line options for the LIST command
+ * Command line options for the LIST command.
*/
public class ListOptions extends CommandLineOptions {
@@ -43,4 +44,4 @@ public class ListOptions extends CommandLineOptions {
public boolean getScheduled() {
return scheduled;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 80f573e..df25e67 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -15,12 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
-import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.commons.cli.CommandLine;
+
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
index 2e4eb31..08a15d3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
@@ -27,4 +28,4 @@ public class RunOptions extends ProgramOptions {
public RunOptions(CommandLine line) throws CliArgsException {
super(line);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
index 305b0b4..1c281d6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
@@ -23,7 +24,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION;
/**
- * Command line options for the SAVEPOINT command
+ * Command line options for the SAVEPOINT command.
*/
public class SavepointOptions extends CommandLineOptions {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
index 7f246c8..6fb03ec 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.cli;
import org.apache.commons.cli.CommandLine;
/**
- * Command line options for the STOP command
+ * Command line options for the STOP command.
*/
public class StopOptions extends CommandLineOptions {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index 59cece3..29836a4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -18,7 +18,6 @@
package org.apache.flink.client.deployment;
-
import org.apache.flink.client.program.ClusterClient;
/**
@@ -27,7 +26,7 @@ import org.apache.flink.client.program.ClusterClient;
public interface ClusterDescriptor<ClientType extends ClusterClient> {
/**
- * Returns a String containing details about the cluster (NodeManagers, available memory, ...)
+ * Returns a String containing details about the cluster (NodeManagers, available memory, ...).
*
*/
String getClusterDescription();
@@ -41,7 +40,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {
ClientType retrieve(String applicationID) throws UnsupportedOperationException;
/**
- * Triggers deployment of a cluster
+ * Triggers deployment of a cluster.
* @return Client for the cluster
* @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 7a3d4d4..699de3b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.deployment;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-
/**
- * A deployment descriptor for an existing cluster
+ * A deployment descriptor for an existing cluster.
*/
public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterClient> {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index e7314eb..3018a8c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -18,7 +18,6 @@
package org.apache.flink.client.program;
-import akka.actor.ActorSystem;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
@@ -59,13 +58,10 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+
+import akka.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -75,32 +71,38 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
/**
* Encapsulates the functionality necessary to submit a program to a remote cluster.
*/
public abstract class ClusterClient {
- private final Logger LOG = LoggerFactory.getLogger(getClass());
+ private final Logger log = LoggerFactory.getLogger(getClass());
- /** The optimizer used in the optimization of batch programs */
+ /** The optimizer used in the optimization of batch programs. */
final Optimizer compiler;
/** The actor system used to communicate with the JobManager. Lazily initialized upon first use */
protected final LazyActorSystemLoader actorSystemLoader;
- /** Configuration of the client */
+ /** Configuration of the client. */
protected final Configuration flinkConfig;
- /** Timeout for futures */
+ /** Timeout for futures. */
protected final FiniteDuration timeout;
- /** Lookup timeout for the job manager retrieval service */
+ /** Lookup timeout for the job manager retrieval service. */
private final FiniteDuration lookupTimeout;
- /** Service factory for high available */
+ /** Service factory for high available. */
protected final HighAvailabilityServices highAvailabilityServices;
- /** Flag indicating whether to sysout print execution updates */
+ /** Flag indicating whether to sysout print execution updates. */
private boolean printStatusDuringExecution = true;
/**
@@ -110,7 +112,7 @@ public abstract class ClusterClient {
*/
private JobExecutionResult lastJobExecutionResult;
- /** Switch for blocking/detached job submission of the client */
+ /** Switch for blocking/detached job submission of the client. */
private boolean detachedJobSubmission = false;
// ------------------------------------------------------------------------
@@ -153,7 +155,7 @@ public abstract class ClusterClient {
highAvailabilityServices,
Time.milliseconds(lookupTimeout.toMillis()),
flinkConfig,
- LOG);
+ log);
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
}
@@ -162,9 +164,12 @@ public abstract class ClusterClient {
// Startup & Shutdown
// ------------------------------------------------------------------------
+ /**
+ * Utility class to lazily instantiate an {@link ActorSystem}.
+ */
protected static class LazyActorSystemLoader {
- private final Logger LOG;
+ private final Logger log;
private final HighAvailabilityServices highAvailabilityServices;
@@ -178,11 +183,11 @@ public abstract class ClusterClient {
HighAvailabilityServices highAvailabilityServices,
Time timeout,
Configuration configuration,
- Logger LOG) {
+ Logger log) {
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
this.timeout = Preconditions.checkNotNull(timeout);
this.configuration = Preconditions.checkNotNull(configuration);
- this.LOG = Preconditions.checkNotNull(LOG);
+ this.log = Preconditions.checkNotNull(log);
}
/**
@@ -210,7 +215,7 @@ public abstract class ClusterClient {
if (!isLoaded()) {
// start actor system
- LOG.info("Starting client actor system.");
+ log.info("Starting client actor system.");
final InetAddress ownHostname;
try {
@@ -296,15 +301,13 @@ public abstract class ClusterClient {
// ------------------------------------------------------------------------
public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
- throws CompilerException, ProgramInvocationException
- {
+ throws CompilerException, ProgramInvocationException {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism));
}
public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism)
- throws CompilerException, ProgramInvocationException
- {
+ throws CompilerException, ProgramInvocationException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism);
@@ -347,8 +350,7 @@ public abstract class ClusterClient {
* @throws ProgramInvocationException
*/
public JobSubmissionResult run(PackagedProgram prog, int parallelism)
- throws ProgramInvocationException, ProgramMissingJobException
- {
+ throws ProgramInvocationException, ProgramMissingJobException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
@@ -362,7 +364,7 @@ public abstract class ClusterClient {
return run(jobWithJars, parallelism, prog.getSavepointSettings());
}
else if (prog.isUsingInteractiveMode()) {
- LOG.info("Starting program in interactive mode");
+ log.info("Starting program in interactive mode");
final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
@@ -436,8 +438,7 @@ public abstract class ClusterClient {
public JobSubmissionResult run(FlinkPlan compiledPlan,
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
- throws ProgramInvocationException
- {
+ throws ProgramInvocationException {
JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
return submitJob(job, classLoader);
}
@@ -507,7 +508,7 @@ public abstract class ClusterClient {
}
/**
- * Reattaches to a running from from the supplied job id
+ * Reattaches to a running from from the supplied job id.
* @param jobID The job id of the job to attach to
* @return The JobExecutionResult for the jobID
* @throws JobExecutionException if an error occurs during monitoring the job execution
@@ -612,7 +613,7 @@ public abstract class ClusterClient {
* Stopping works only for streaming programs. Be aware, that the program might continue to run for
* a while after sending the stop command, because after sources stopped to emit data all operators
* need to finish processing.
- *
+ *
* @param jobId
* the job ID of the streaming program to stop
* @throws Exception
@@ -632,10 +633,10 @@ public abstract class ClusterClient {
final Object result = Await.result(response, timeout);
if (result instanceof JobManagerMessages.StoppingSuccess) {
- LOG.info("Job stopping with ID " + jobId + " succeeded.");
+ log.info("Job stopping with ID " + jobId + " succeeded.");
} else if (result instanceof JobManagerMessages.StoppingFailure) {
final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause();
- LOG.info("Job stopping with ID " + jobId + " failed.", t);
+ log.info("Job stopping with ID " + jobId + " failed.", t);
throw new Exception("Failed to stop the job because of \n" + t.getMessage());
} else {
throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
@@ -685,14 +686,13 @@ public abstract class ClusterClient {
}
}
-
// ------------------------------------------------------------------------
// Sessions
// ------------------------------------------------------------------------
/**
* Tells the JobManager to finish the session (job) defined by the given ID.
- *
+ *
* @param jobId The ID that identifies the session.
*/
public void endSession(JobID jobId) throws Exception {
@@ -713,10 +713,10 @@ public abstract class ClusterClient {
}
ActorGateway jobManagerGateway = getJobManagerGateway();
-
+
for (JobID jid : jobIds) {
if (jid != null) {
- LOG.info("Telling job manager to end the session {}.", jid);
+ log.info("Telling job manager to end the session {}.", jid);
jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid));
}
}
@@ -760,7 +760,7 @@ public abstract class ClusterClient {
throw new RuntimeException("URL is invalid. This should not happen.", e);
}
}
-
+
job.setClasspaths(classpaths);
return job;
@@ -778,7 +778,7 @@ public abstract class ClusterClient {
* @throws Exception
*/
public ActorGateway getJobManagerGateway() throws Exception {
- LOG.debug("Looking up JobManager");
+ log.debug("Looking up JobManager");
try {
return LeaderRetrievalUtils.retrieveLeaderGateway(
@@ -796,7 +796,7 @@ public abstract class ClusterClient {
* @param message The message to log/print
*/
protected void logAndSysout(String message) {
- LOG.info(message);
+ log.info(message);
if (printStatusDuringExecution) {
System.out.println(message);
}
@@ -809,18 +809,18 @@ public abstract class ClusterClient {
/**
* Blocks until the client has determined that the cluster is ready for Job submission.
*
- * This is delayed until right before job submission to report any other errors first
+ * <p>This is delayed until right before job submission to report any other errors first
* (e.g. invalid job definitions/errors in the user jar)
*/
public abstract void waitForClusterToBeReady();
/**
- * Returns an URL (as a string) to the JobManager web interface
+ * Returns an URL (as a string) to the JobManager web interface.
*/
public abstract String getWebInterfaceURL();
/**
- * Returns the latest cluster status, with number of Taskmanagers and slots
+ * Returns the latest cluster status, with number of Taskmanagers and slots.
*/
public abstract GetClusterStatusResponse getClusterStatus();
@@ -857,7 +857,7 @@ public abstract class ClusterClient {
}
/**
- * Return the Flink configuration object
+ * Return the Flink configuration object.
* @return The Flink configuration object
*/
public Configuration getFlinkConfiguration() {
@@ -865,7 +865,7 @@ public abstract class ClusterClient {
}
/**
- * The client may define an upper limit on the number of slots to use
+ * The client may define an upper limit on the number of slots to use.
* @return -1 if unknown
*/
public abstract int getMaxSlots();
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 1ef94ce..7e47825 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -40,11 +40,11 @@ public class ContextEnvironment extends ExecutionEnvironment {
protected final List<URL> jarFilesToAttach;
protected final List<URL> classpathsToAttach;
-
+
protected final ClassLoader userCodeClassLoader;
protected final SavepointRestoreSettings savepointSettings;
-
+
public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths,
ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) {
this.client = remoteConnection;
@@ -83,11 +83,11 @@ public class ContextEnvironment extends ExecutionEnvironment {
return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
+ ") : " + getIdString();
}
-
+
public ClusterClient getClient() {
return this.client;
}
-
+
public List<URL> getJars(){
return jarFilesToAttach;
}
@@ -105,11 +105,11 @@ public class ContextEnvironment extends ExecutionEnvironment {
}
// --------------------------------------------------------------------------------------------
-
+
static void setAsContext(ContextEnvironmentFactory factory) {
initializeContextEnvironment(factory);
}
-
+
static void unsetContext() {
resetContextEnvironment();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index 0175d4c..6209254 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -51,8 +51,7 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach,
List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
- boolean isDetached, SavepointRestoreSettings savepointSettings)
- {
+ boolean isDetached, SavepointRestoreSettings savepointSettings) {
this.client = client;
this.jarFilesToAttach = jarFilesToAttach;
this.classpathsToAttach = classpathsToAttach;
@@ -68,9 +67,9 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
}
- lastEnvCreated = isDetached ?
- new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings):
- new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
+ lastEnvCreated = isDetached
+ ? new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings)
+ : new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
if (defaultParallelism > 0) {
lastEnvCreated.setParallelism(defaultParallelism);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index c67688f..63aa811 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +77,9 @@ public class DetachedEnvironment extends ContextEnvironment {
return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
}
+ /**
+ * The {@link JobExecutionResult} returned by a {@link DetachedEnvironment}.
+ */
public static final class DetachedJobExecutionResult extends JobExecutionResult {
public static final DetachedJobExecutionResult INSTANCE = new DetachedJobExecutionResult();
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index d5a3014..ae94ece 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -18,6 +18,9 @@
package org.apache.flink.client.program;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -26,21 +29,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
-
/**
* A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
* the classes of the functions and libraries necessary for the execution.
*/
public class JobWithJars {
-
+
private Plan plan;
-
+
private List<URL> jarFiles;
/**
- * classpaths that are needed during user code execution
+ * classpaths that are needed during user code execution.
*/
private List<URL> classpaths;
@@ -68,7 +68,7 @@ public class JobWithJars {
this.jarFiles = Collections.singletonList(jarFile);
this.classpaths = Collections.<URL>emptyList();
}
-
+
JobWithJars(Plan plan, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
this.plan = plan;
this.jarFiles = jarFiles;
@@ -77,7 +77,7 @@ public class JobWithJars {
}
/**
- * Returns the plan
+ * Returns the plan.
*/
public Plan getPlan() {
return this.plan;
@@ -89,17 +89,17 @@ public class JobWithJars {
public List<URL> getJarFiles() {
return this.jarFiles;
}
-
+
/**
* Returns list of classpaths that need to be submitted with the plan.
*/
public List<URL> getClasspaths() {
return classpaths;
}
-
+
/**
* Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
- *
+ *
* @return The user code ClassLoader.
*/
public ClassLoader getUserCodeClassLoader() {
@@ -108,7 +108,6 @@ public class JobWithJars {
}
return this.userCodeClassLoader;
}
-
public static void checkJarFile(URL jar) throws IOException {
File jarFile;
@@ -125,7 +124,7 @@ public class JobWithJars {
}
// TODO: Check if proper JAR file
}
-
+
public static ClassLoader buildUserCodeClassLoader(List<URL> jars, List<URL> classpaths, ClassLoader parent) {
URL[] urls = new URL[jars.size() + classpaths.size()];
for (int i = 0; i < jars.size(); i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index 64076de..faacd9f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -28,6 +28,9 @@ import org.apache.flink.optimizer.plan.FlinkPlan;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+/**
+ * An {@link ExecutionEnvironment} that never executes a job but only creates the optimized plan.
+ */
public class OptimizerPlanEnvironment extends ExecutionEnvironment {
private final Optimizer compiler;
@@ -41,7 +44,7 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
// ------------------------------------------------------------------------
// Execution Environment methods
// ------------------------------------------------------------------------
-
+
@Override
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
@@ -66,7 +69,7 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
}
public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
-
+
// temporarily write syserr and sysout to a byte array.
PrintStream originalOut = System.out;
PrintStream originalErr = System.err;
@@ -98,10 +101,10 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
String stdout = baos.toString();
String stderr = baes.toString();
-
+
throw new ProgramInvocationException(
"The program plan could not be fetched - the program aborted pre-maturely."
- + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stdout)
+ + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stdout)
+ "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stderr));
}
// ------------------------------------------------------------------------
@@ -116,13 +119,13 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment {
};
initializeContextEnvironment(factory);
}
-
+
private void unsetAsContext() {
resetContextEnvironment();
}
// ------------------------------------------------------------------------
-
+
public void setPlan(FlinkPlan plan){
this.optimizerPlan = plan;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index a44ee46..35bb04f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client.program;
import org.apache.flink.api.common.Plan;
@@ -63,7 +62,7 @@ public class PackagedProgram {
* Property name of the entry in JAR manifest file that describes the Flink specific entry point.
*/
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
-
+
/**
* Property name of the entry in JAR manifest file that describes the class with the main method.
*/
@@ -74,17 +73,17 @@ public class PackagedProgram {
private final URL jarFile;
private final String[] args;
-
+
private final Program program;
-
+
private final Class<?> mainClass;
-
+
private final List<File> extractedTempLibraries;
private final List<URL> classpaths;
-
+
private ClassLoader userCodeClassLoader;
-
+
private Plan plan;
private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
@@ -110,7 +109,7 @@ public class PackagedProgram {
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
- *
+ *
* @param jarFile
* The jar file which contains the plan and a Manifest which defines
* the program-class
@@ -131,7 +130,7 @@ public class PackagedProgram {
* Creates an instance that wraps the plan defined in the jar file using the given
* arguments. For generating the plan the class defined in the className parameter
* is used.
- *
+ *
* @param jarFile
* The jar file which contains the plan.
* @param entryPointClassName
@@ -152,7 +151,7 @@ public class PackagedProgram {
* Creates an instance that wraps the plan defined in the jar file using the given
* arguments. For generating the plan the class defined in the className parameter
* is used.
- *
+ *
* @param jarFile
* The jar file which contains the plan.
* @param classpaths
@@ -171,32 +170,32 @@ public class PackagedProgram {
if (jarFile == null) {
throw new IllegalArgumentException("The jar file must not be null.");
}
-
+
URL jarFileUrl;
try {
jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e1) {
throw new IllegalArgumentException("The jar file path is invalid.");
}
-
+
checkJarFile(jarFileUrl);
-
+
this.jarFile = jarFileUrl;
this.args = args == null ? new String[0] : args;
-
+
// if no entryPointClassName name was given, we try and look one up through the manifest
if (entryPointClassName == null) {
entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
}
-
+
// now that we have an entry point, we can extract the nested jar files (if any)
this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
this.classpaths = classpaths;
this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
-
+
// load the entry point class
this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
-
+
// if the entry point is a program, instantiate the class and get the plan
if (Program.class.isAssignableFrom(this.mainClass)) {
Program prg = null;
@@ -206,7 +205,7 @@ public class PackagedProgram {
// validate that the class has a main method at least.
// the main method possibly instantiates the program properly
if (!hasMainMethod(mainClass)) {
- throw new ProgramInvocationException("The given program class implements the " +
+ throw new ProgramInvocationException("The given program class implements the " +
Program.class.getName() + " interface, but cannot be instantiated. " +
"It also declares no main(String[]) method as alternative entry point", e);
}
@@ -217,22 +216,22 @@ public class PackagedProgram {
} else if (hasMainMethod(mainClass)) {
this.program = null;
} else {
- throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
+ throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
Program.class.getName() + " interface.");
}
}
-
+
PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException {
this.jarFile = null;
this.args = args == null ? new String[0] : args;
-
+
this.extractedTempLibraries = Collections.emptyList();
this.classpaths = Collections.emptyList();
this.userCodeClassLoader = entryPointClass.getClassLoader();
-
+
// load the entry point class
this.mainClass = entryPointClass;
-
+
// if the entry point is a program, instantiate the class and get the plan
if (Program.class.isAssignableFrom(this.mainClass)) {
Program prg = null;
@@ -242,7 +241,7 @@ public class PackagedProgram {
// validate that the class has a main method at least.
// the main method possibly instantiates the program properly
if (!hasMainMethod(mainClass)) {
- throw new ProgramInvocationException("The given program class implements the " +
+ throw new ProgramInvocationException("The given program class implements the " +
Program.class.getName() + " interface, but cannot be instantiated. " +
"It also declares no main(String[]) method as alternative entry point", e);
}
@@ -253,7 +252,7 @@ public class PackagedProgram {
} else if (hasMainMethod(mainClass)) {
this.program = null;
} else {
- throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
+ throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
Program.class.getName() + " interface.");
}
}
@@ -269,15 +268,15 @@ public class PackagedProgram {
public String[] getArguments() {
return this.args;
}
-
+
public String getMainClassName() {
return this.mainClass.getName();
}
-
+
public boolean isUsingInteractiveMode() {
return this.program == null;
}
-
+
public boolean isUsingProgramEntryPoint() {
return this.program != null;
}
@@ -410,7 +409,6 @@ public class PackagedProgram {
}
/**
- *
* This method assumes that the context environment is prepared, or the execution
* will be a local execution by default.
*/
@@ -468,11 +466,10 @@ public class PackagedProgram {
deleteExtractedLibraries(this.extractedTempLibraries);
this.extractedTempLibraries.clear();
}
-
-
+
/**
* Returns the plan as generated from the Pact Assembler.
- *
+ *
* @return The program's plan.
* @throws ProgramInvocationException Thrown, if an error occurred in the program while
* creating the program's {@link Plan}.
@@ -482,10 +479,10 @@ public class PackagedProgram {
Thread.currentThread().setContextClassLoader(this.userCodeClassLoader);
this.plan = createPlanFromProgram(this.program, this.args);
}
-
+
return this.plan;
}
-
+
private static boolean hasMainMethod(Class<?> entryClass) {
Method mainMethod;
try {
@@ -494,13 +491,13 @@ public class PackagedProgram {
return false;
}
catch (Throwable t) {
- throw new RuntimeException("Could not look up the main(String[]) method from the class " +
+ throw new RuntimeException("Could not look up the main(String[]) method from the class " +
entryClass.getName() + ": " + t.getMessage(), t);
}
-
+
return Modifier.isStatic(mainMethod.getModifiers()) && Modifier.isPublic(mainMethod.getModifiers());
}
-
+
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
@@ -513,17 +510,17 @@ public class PackagedProgram {
throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
}
catch (Throwable t) {
- throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
+ throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
entryClass.getName() + ": " + t.getMessage(), t);
}
-
+
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");
}
-
+
try {
mainMethod.invoke(null, (Object) args);
}
@@ -574,20 +571,19 @@ public class PackagedProgram {
throw new ProgramInvocationException("The Manifest in the jar file could not be accessed '"
+ jarFile.getPath() + "'. " + ioex.getMessage(), ioex);
}
-
+
if (manifest == null) {
throw new ProgramInvocationException("No manifest found in jar file '" + jarFile.getPath() + "'. The manifest is need to point to the program's main class.");
}
-
+
Attributes attributes = manifest.getMainAttributes();
-
+
// check for a "program-class" entry first
className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
if (className != null) {
return className;
}
-
-
+
// check for a main class
className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
if (className != null) {
@@ -605,7 +601,7 @@ public class PackagedProgram {
}
}
}
-
+
private static Class<?> loadMainClass(String className, ClassLoader cl) throws ProgramInvocationException {
ClassLoader contextCl = null;
try {
@@ -627,20 +623,20 @@ public class PackagedProgram {
}
catch (Throwable t) {
throw new ProgramInvocationException("The program's entry point class '" + className
- + "' caused an exception during initialization: "+ t.getMessage(), t);
+ + "' caused an exception during initialization: " + t.getMessage(), t);
} finally {
if (contextCl != null) {
Thread.currentThread().setContextClassLoader(contextCl);
}
}
}
-
+
/**
* Takes the jar described by the given file and invokes its pact assembler class to
* assemble a plan. The assembler class name is either passed through a parameter,
* or it is read from the manifest of the jar. The assembler is handed the given options
* for its assembly.
- *
+ *
* @param program The program to create the plan for.
* @param options
* The options for the assembler.
@@ -655,33 +651,33 @@ public class PackagedProgram {
throw new ProgramInvocationException("Error while calling the program: " + t.getMessage(), t);
}
}
-
+
/**
* Takes all JAR files that are contained in this program's JAR file and extracts them
* to the system's temp directory.
- *
+ *
* @return The file names of the extracted temporary files.
* @throws ProgramInvocationException Thrown, if the extraction process failed.
*/
public static List<File> extractContainedLibraries(URL jarFile) throws ProgramInvocationException {
-
+
Random rnd = new Random();
-
+
JarFile jar = null;
try {
jar = new JarFile(new File(jarFile.toURI()));
final List<JarEntry> containedJarFileEntries = new ArrayList<JarEntry>();
-
+
Enumeration<JarEntry> entries = jar.entries();
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
String name = entry.getName();
-
+
if (name.length() > 8 && name.startsWith("lib/") && name.endsWith(".jar")) {
containedJarFileEntries.add(entry);
}
}
-
+
if (containedJarFileEntries.isEmpty()) {
return Collections.emptyList();
}
@@ -689,15 +685,15 @@ public class PackagedProgram {
// go over all contained jar files
final List<File> extractedTempLibraries = new ArrayList<File>(containedJarFileEntries.size());
final byte[] buffer = new byte[4096];
-
+
boolean incomplete = true;
-
+
try {
for (int i = 0; i < containedJarFileEntries.size(); i++) {
final JarEntry entry = containedJarFileEntries.get(i);
String name = entry.getName();
name = name.replace(File.separatorChar, '_');
-
+
File tempFile;
try {
tempFile = File.createTempFile(rnd.nextInt(Integer.MAX_VALUE) + "_", name);
@@ -705,21 +701,20 @@ public class PackagedProgram {
}
catch (IOException e) {
throw new ProgramInvocationException(
- "An I/O error occurred while creating temporary file to extract nested library '" +
+ "An I/O error occurred while creating temporary file to extract nested library '" +
entry.getName() + "'.", e);
}
-
+
extractedTempLibraries.add(tempFile);
-
+
// copy the temp file contents to a temporary File
OutputStream out = null;
- InputStream in = null;
+ InputStream in = null;
try {
-
-
+
out = new FileOutputStream(tempFile);
in = new BufferedInputStream(jar.getInputStream(entry));
-
+
int numRead = 0;
while ((numRead = in.read(buffer)) != -1) {
out.write(buffer, 0, numRead);
@@ -738,7 +733,7 @@ public class PackagedProgram {
}
}
}
-
+
incomplete = false;
}
finally {
@@ -746,7 +741,7 @@ public class PackagedProgram {
deleteExtractedLibraries(extractedTempLibraries);
}
}
-
+
return extractedTempLibraries;
}
}
@@ -761,13 +756,13 @@ public class PackagedProgram {
}
}
}
-
+
public static void deleteExtractedLibraries(List<File> tempLibraries) {
for (File f : tempLibraries) {
f.delete();
}
}
-
+
private static void checkJarFile(URL jarfile) throws ProgramInvocationException {
try {
JobWithJars.checkJarFile(jarfile);
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
index 0051e60..271864f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java
@@ -69,11 +69,10 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment {
};
initializeContextEnvironment(factory);
}
-
+
public void unsetAsContext() {
resetContextEnvironment();
}
-
public void setPreview(String preview) {
this.preview = preview;
@@ -82,4 +81,4 @@ public final class PreviewPlanEnvironment extends ExecutionEnvironment {
public Plan getPlan() {
return plan;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
index 68bcba6..ee58227 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramInvocationException.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client.program;
/**
@@ -30,7 +29,7 @@ public class ProgramInvocationException extends Exception {
/**
* Creates a <tt>ProgramInvocationException</tt> with the given message.
- *
+ *
* @param message
* The message for the exception.
*/
@@ -40,7 +39,7 @@ public class ProgramInvocationException extends Exception {
/**
* Creates a <tt>ProgramInvocationException</tt> for the given exception.
- *
+ *
* @param cause
* The exception that causes the program invocation to fail.
*/
@@ -51,7 +50,7 @@ public class ProgramInvocationException extends Exception {
/**
* Creates a <tt>ProgramInvocationException</tt> for the given exception with an
* additional message.
- *
+ *
* @param message
* The additional message.
* @param cause
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index b00e519..19a365e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.client.program;
import org.apache.flink.api.common.JobSubmissionResult;
@@ -25,13 +26,14 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import java.net.URL;
import java.util.Collections;
import java.util.List;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
/**
* Cluster client for communication with an standalone (on-premise) cluster or an existing cluster that has been
* brought up independently of a specific job.
@@ -49,7 +51,6 @@ public class StandaloneClusterClient extends ClusterClient {
@Override
public void waitForClusterToBeReady() {}
-
@Override
public String getWebInterfaceURL() {
String host = getJobManagerAddress().getHostString();
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 8320e04..28c3226 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.junit.Before;
+
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index 5cc90eb..5a79bb6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -23,10 +23,14 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * Tests for the "info" command.
+ */
public class CliFrontendInfoTest {
-
+
private static PrintStream stdOut;
private static PrintStream capture;
private static ByteArrayOutputStream buffer;
@@ -41,7 +45,7 @@ public class CliFrontendInfoTest {
int retCode = testFrontend.cancel(parameters);
assertTrue(retCode != 0);
}
-
+
// test missing options
{
String[] parameters = {};
@@ -55,7 +59,7 @@ public class CliFrontendInfoTest {
fail("Program caused an exception: " + e.getMessage());
}
}
-
+
@Test
public void testShowExecutionPlan() {
replaceStdOut();
@@ -74,7 +78,7 @@ public class CliFrontendInfoTest {
restoreStdOut();
}
}
-
+
@Test
public void testShowExecutionPlanWithParallelism() {
replaceStdOut();
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 4d3405f..725d95a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -18,17 +18,18 @@
package org.apache.flink.client;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -41,6 +42,9 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for the CANCEL and LIST commands.
+ */
public class CliFrontendListCancelTest {
private static ActorSystem actorSystem;
@@ -56,12 +60,12 @@ public class CliFrontendListCancelTest {
JavaTestKit.shutdownActorSystem(actorSystem);
actorSystem = null;
}
-
+
@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
}
-
+
@Test
public void testCancel() {
try {
@@ -72,7 +76,7 @@ public class CliFrontendListCancelTest {
int retCode = testFrontend.cancel(parameters);
assertTrue(retCode != 0);
}
-
+
// test missing job id
{
String[] parameters = {};
@@ -80,7 +84,7 @@ public class CliFrontendListCancelTest {
int retCode = testFrontend.cancel(parameters);
assertTrue(retCode != 0);
}
-
+
// test cancel properly
{
JobID jid = new JobID();
@@ -96,7 +100,7 @@ public class CliFrontendListCancelTest {
);
final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
-
+
String[] parameters = { jidString };
InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
@@ -205,7 +209,7 @@ public class CliFrontendListCancelTest {
int retCode = testFrontend.list(parameters);
assertTrue(retCode != 0);
}
-
+
// test list properly
{
final UUID leaderSessionID = UUID.randomUUID();
@@ -230,8 +234,7 @@ public class CliFrontendListCancelTest {
}
}
-
- protected static final class InfoListTestCliFrontend extends CliFrontend {
+ private static final class InfoListTestCliFrontend extends CliFrontend {
private ActorGateway jobManagerGateway;
@@ -246,7 +249,7 @@ public class CliFrontendListCancelTest {
}
}
- protected static final class CliJobManager extends FlinkUntypedActor {
+ private static final class CliJobManager extends FlinkUntypedActor {
private final JobID jobID;
private final UUID leaderSessionID;
private final String targetDirectory;
[2/3] flink git commit: [FLINK-6719] Activate strict checkstyle for
flink-clients
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index f47ca69..a75f49b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -18,27 +18,18 @@
package org.apache.flink.client;
-import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
-import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
-import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath;
-import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
-import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.configuration.Configuration;
-
+import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -46,9 +37,24 @@ import org.junit.Test;
import java.io.FileNotFoundException;
import java.net.URL;
-
+import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
+import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath;
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the RUN command with {@link PackagedProgram PackagedPrograms}.
+ */
public class CliFrontendPackageProgramTest {
-
+
@BeforeClass
public static void init() {
pipeSystemOutToNull();
@@ -75,7 +81,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testFileNotJarFile() {
try {
@@ -97,7 +103,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testVariantWithExplicitJarAndArgumentsOption() {
try {
@@ -125,7 +131,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testVariantWithExplicitJarAndNoArgumentsOption() {
try {
@@ -154,7 +160,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testValidVariantWithNoJarAndNoArgumentsOption() {
try {
@@ -183,7 +189,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testNoJarNoArgumentsAtAll() {
try {
@@ -195,7 +201,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testNonExistingFileWithArguments() {
try {
@@ -227,7 +233,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testNonExistingFileWithoutArguments() {
try {
@@ -251,7 +257,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
/**
* Ensure that we will never have the following error.
*
@@ -276,7 +282,7 @@ public class CliFrontendPackageProgramTest {
* at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
* </pre>
*
- * The test works as follows:
+ * <p>The test works as follows:
*
* <ul>
* <li> Use the CliFrontend to invoke a jar file that loads a class which is only available
@@ -303,7 +309,7 @@ public class CliFrontendPackageProgramTest {
assertArrayEquals(classpath, options.getClasspaths().toArray());
assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, options.getEntryPointClassName());
assertArrayEquals(reducedArguments, options.getProgramArgs());
-
+
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
PackagedProgram prog = spy(frontend.buildProgram(options));
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 91c4cf8..43116e4 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client;
import org.apache.flink.client.cli.CliFrontendParser;
@@ -24,6 +23,7 @@ import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
import org.junit.BeforeClass;
import org.junit.Test;
@@ -34,14 +34,16 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
+/**
+ * Tests for the RUN command.
+ */
public class CliFrontendRunTest {
-
+
@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
}
-
+
@Test
public void testRun() {
try {
@@ -135,7 +137,7 @@ public class CliFrontendRunTest {
// --------------------------------------------------------------------------------------------
- public static final class RunTestingCliFrontend extends CliFrontend {
+ private static final class RunTestingCliFrontend extends CliFrontend {
private final int expectedParallelism;
private final boolean sysoutLogging;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index e26d5a5..cfed859 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -18,19 +18,16 @@
package org.apache.flink.client;
-import akka.dispatch.Futures;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import akka.dispatch.Futures;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
-import scala.Option;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -38,6 +35,11 @@ import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.zip.ZipOutputStream;
+import scala.Option;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
@@ -52,6 +54,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the SAVEPOINT command.
+ */
public class CliFrontendSavepointTest {
private static PrintStream stdOut;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index 9522ac7..fef4880 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -18,16 +18,19 @@
package org.apache.flink.client;
-import akka.actor.*;
-import akka.testkit.JavaTestKit;
-
-import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,8 +38,11 @@ import org.junit.Test;
import java.util.UUID;
import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the STOP command.
+ */
public class CliFrontendStopTest extends TestLogger {
private static ActorSystem actorSystem;
@@ -105,7 +111,7 @@ public class CliFrontendStopTest extends TestLogger {
}
}
- protected static final class StopTestCliFrontend extends CliFrontend {
+ private static final class StopTestCliFrontend extends CliFrontend {
private ActorGateway jobManagerGateway;
@@ -120,7 +126,7 @@ public class CliFrontendStopTest extends TestLogger {
}
}
- protected static final class CliJobManager extends FlinkUntypedActor {
+ private static final class CliJobManager extends FlinkUntypedActor {
private final JobID jobID;
private final UUID leaderSessionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index c411a7b..2a20d8e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -16,52 +16,49 @@
* limitations under the License.
*/
-
package org.apache.flink.client;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
-import java.lang.reflect.Field;
import java.net.MalformedURLException;
-import java.util.Map;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+/**
+ * Test utilities.
+ */
public class CliFrontendTestUtils {
-
+
public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";
-
+
public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "org.apache.flink.client.testjar.JobWithExternalDependency";
public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
public static final int TEST_JOB_MANAGER_PORT = 55443;
-
-
+
public static String getTestJarPath() throws FileNotFoundException, MalformedURLException {
File f = new File("target/maven-test-jar.jar");
- if(!f.exists()) {
+ if (!f.exists()) {
throw new FileNotFoundException("Test jar not present. Invoke tests using maven "
+ "or build the jar using 'mvn process-test-classes' in flink-clients");
}
return f.getAbsolutePath();
}
-
+
public static String getNonJarFilePath() {
return CliFrontendRunTest.class.getResource("/testconfig/flink-conf.yaml").getFile();
}
-
+
public static String getConfigDir() {
String confFile = CliFrontendRunTest.class.getResource("/testconfig/flink-conf.yaml").getFile();
return new File(confFile).getAbsoluteFile().getParent();
}
-
+
public static String getInvalidConfigDir() {
String confFile = CliFrontendRunTest.class.getResource("/invalidtestconfig/flink-conf.yaml").getFile();
return new File(confFile).getAbsoluteFile().getParent();
@@ -84,8 +81,8 @@ public class CliFrontendTestUtils {
assertEquals(expectedAddress, jobManagerAddress);
assertEquals(expectedPort, jobManagerPort);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
private CliFrontendTestUtils() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index be93949..73e99e5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
+
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,6 +36,9 @@ import java.util.Collections;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
+/**
+ * Tests the hostname resolution of the {@link RemoteExecutor}.
+ */
public class RemoteExecutorHostnameResolutionTest extends TestLogger {
private static final String nonExistingHostname = "foo.bar.com.invalid";
@@ -72,7 +76,7 @@ public class RemoteExecutorHostnameResolutionTest extends TestLogger {
// that is what we want!
}
}
-
+
private static Plan getProgram() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index eb9f3c5..2b760bd 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.client.program;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -35,13 +32,19 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.UUID;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* This test starts a job client without the JobManager being reachable. It
@@ -114,7 +117,7 @@ public class ClientConnectionTest extends TestLogger {
/**
* FLINK-6629
*
- * Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
+ * <p>Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
* {@link ActorSystem} and retrieving the leading JobManager.
*/
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 13a2564..9349401 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -18,11 +18,9 @@
package org.apache.flink.client.program;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-
import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.ProgramDescription;
@@ -42,18 +40,19 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.NetUtils;
-
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -61,8 +60,9 @@ import java.net.URL;
import java.util.Collections;
import java.util.UUID;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -87,13 +87,13 @@ public class ClientTest extends TestLogger {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
-
+
Plan plan = env.createProgramPlan();
JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(), Collections.<URL>emptyList());
program = mock(PackagedProgram.class);
when(program.getPlanWithJars()).thenReturn(jobWithJars);
-
+
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
@@ -247,7 +247,7 @@ public class ClientTest extends TestLogger {
jobManagerSystem.actorOf(
Props.create(SuccessReturningActor.class),
JobMaster.JOB_MANAGER_NAME);
-
+
PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
when(packagedProgramMock.isUsingInteractiveMode()).thenReturn(true);
doAnswer(new Answer<Void>() {
@@ -280,10 +280,10 @@ public class ClientTest extends TestLogger {
jobManagerSystem.actorOf(
Props.create(FailureReturningActor.class),
JobMaster.JOB_MANAGER_NAME);
-
+
PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
assertNotNull(prg.getPreviewPlan());
-
+
Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, 1);
assertNotNull(op);
@@ -306,7 +306,7 @@ public class ClientTest extends TestLogger {
// --------------------------------------------------------------------------------------------
- public static class SuccessReturningActor extends FlinkUntypedActor {
+ private static class SuccessReturningActor extends FlinkUntypedActor {
private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
@@ -336,7 +336,7 @@ public class ClientTest extends TestLogger {
}
}
- public static class FailureReturningActor extends FlinkUntypedActor {
+ private static class FailureReturningActor extends FlinkUntypedActor {
private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
@@ -353,7 +353,10 @@ public class ClientTest extends TestLogger {
return leaderSessionID;
}
}
-
+
+ /**
+ * A test job.
+ */
public static class TestOptimizerPlan implements ProgramDescription {
@SuppressWarnings("serial")
@@ -369,23 +372,27 @@ public class ClientTest extends TestLogger {
.fieldDelimiter("\t").types(Long.class, Long.class);
DataSet<Tuple2<Long, Long>> result = input.map(
- new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+ new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
- return new Tuple2<Long, Long>(value.f0, value.f1+1);
+ return new Tuple2<Long, Long>(value.f0, value.f1 + 1);
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}
+
@Override
public String getDescription() {
return "TestOptimizerPlan <input-file-path> <output-file-path>";
}
}
+ /**
+ * Test job that calls {@link ExecutionEnvironment#execute()} twice.
+ */
public static final class TestExecuteTwice {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute();
@@ -393,44 +400,59 @@ public class ClientTest extends TestLogger {
}
}
+ /**
+ * Test job that uses an eager sink.
+ */
public static final class TestEager {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).collect();
}
}
+ /**
+ * Test job that retrieves the net runtime from the {@link JobExecutionResult}.
+ */
public static final class TestGetRuntime {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getNetRuntime();
}
}
+ /**
+ * Test job that retrieves the job ID from the {@link JobExecutionResult}.
+ */
public static final class TestGetJobID {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getJobID();
}
}
+ /**
+ * Test job that retrieves an accumulator from the {@link JobExecutionResult}.
+ */
public static final class TestGetAccumulator {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getAccumulatorResult(ACCUMULATOR_NAME);
}
}
+ /**
+ * Test job that retrieves all accumulators from the {@link JobExecutionResult}.
+ */
public static final class TestGetAllAccumulator {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getAllAccumulatorResults();
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 3879fa3..97a881c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -21,18 +21,22 @@ package org.apache.flink.client.program;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+/**
+ * Tests for the {@link ClusterClient}.
+ */
public class ClusterClientTest extends TestLogger {
/**
* FLINK-6641
*
- * Tests that the {@link ClusterClient} does not clean up HA data when being shut down.
+ * <p>Tests that the {@link ClusterClient} does not clean up HA data when being shut down.
*/
@Test
public void testClusterClientShutdown() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index 4ec0e47..ae30c3a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -26,25 +26,31 @@ import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.io.Serializable;
import static org.junit.Assert.fail;
+/**
+ * Tests that verify subsequent calls to {@link ExecutionEnvironment#getExecutionPlan()} and
+ * {@link ExecutionEnvironment#execute()}/{@link ExecutionEnvironment#createProgramPlan()} do not cause any exceptions.
+ */
@SuppressWarnings("serial")
public class ExecutionPlanAfterExecutionTest extends TestLogger implements Serializable {
@Test
public void testExecuteAfterGetExecutionPlan() {
- ExecutionEnvironment env = new LocalEnvironment();
+ ExecutionEnvironment env = new LocalEnvironment();
env.getConfig().disableSysoutLogging();
-
+
DataSet<Integer> baseSet = env.fromElements(1, 2);
DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
- @Override public Integer map(Integer value) throws Exception { return value * 2; }
- });
+ @Override public Integer map(Integer value) throws Exception {
+ return value * 2;
+ }});
result.output(new DiscardingOutputFormat<Integer>());
try {
@@ -56,16 +62,17 @@ public class ExecutionPlanAfterExecutionTest extends TestLogger implements Seria
fail("Cannot run both #getExecutionPlan and #execute.");
}
}
-
+
@Test
public void testCreatePlanAfterGetExecutionPlan() {
ExecutionEnvironment env = new LocalEnvironment();
-
+
DataSet<Integer> baseSet = env.fromElements(1, 2);
DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
- @Override public Integer map(Integer value) throws Exception { return value * 2; }
- });
+ @Override public Integer map(Integer value) throws Exception {
+ return value * 2;
+ }});
result.output(new DiscardingOutputFormat<Integer>());
try {
@@ -73,7 +80,7 @@ public class ExecutionPlanAfterExecutionTest extends TestLogger implements Seria
env.createProgramPlan();
} catch (Exception e) {
e.printStackTrace();
- fail("Cannot run both #getExecutionPlan and #execute. Message: "+e.getMessage());
+ fail("Cannot run both #getExecutionPlan and #execute. Message: " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index c291ada..9c5a878 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -18,25 +18,31 @@
package org.apache.flink.client.program;
-import static org.junit.Assert.*;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
+
import org.junit.Test;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the generation of execution plans.
+ */
public class ExecutionPlanCreationTest {
@Test
@@ -44,7 +50,7 @@ public class ExecutionPlanCreationTest {
try {
PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
assertNotNull(prg.getPreviewPlan());
-
+
InetAddress mockAddress = InetAddress.getLocalHost();
InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
@@ -56,15 +62,15 @@ public class ExecutionPlanCreationTest {
Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1);
assertNotNull(op);
-
+
PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
assertNotNull(dumper.getOptimizerPlanAsJSON(op));
-
+
// test HTML escaping
PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
dumper2.setEncodeForHTML(true);
String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
-
+
assertEquals(-1, htmlEscaped.indexOf('\\'));
}
catch (Exception e) {
@@ -72,30 +78,34 @@ public class ExecutionPlanCreationTest {
fail(e.getMessage());
}
}
-
+
+ /**
+ * A test job.
+ */
public static class TestOptimizerPlan implements ProgramDescription {
-
+
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
return;
}
-
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0])
.fieldDelimiter("\t").types(Long.class, Long.class);
-
+
DataSet<Tuple2<Long, Long>> result = input.map(
- new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+ new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
- return new Tuple2<Long, Long>(value.f0, value.f1+1);
+ return new Tuple2<Long, Long>(value.f0, value.f1 + 1);
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}
+
@Override
public String getDescription() {
return "TestOptimizerPlan <input-file-path> <output-file-path>";
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index 0ecdc2c..4731d44 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -24,18 +24,19 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.util.StandaloneUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLogger;
+
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
/**
* Tests that verify that the LeaderRetrievalService correctly handles non-resolvable host names
- * and does not fail with another exception
+ * and does not fail with another exception.
*/
public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 95506f4..e68d1dc 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -18,27 +18,30 @@
package org.apache.flink.client.program;
-import java.io.File;
-import java.io.PrintStream;
-
import org.apache.flink.client.CliFrontendTestUtils;
+
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
+import java.io.PrintStream;
+/**
+ * Tests for the {@link PackagedProgramTest}.
+ */
public class PackagedProgramTest {
@Test
public void testGetPreviewPlan() {
try {
PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
-
+
final PrintStream out = System.out;
final PrintStream err = System.err;
try {
System.setOut(new PrintStream(new NullOutputStream()));
System.setErr(new PrintStream(new NullOutputStream()));
-
+
Assert.assertNotNull(prog.getPreviewPlan());
}
finally {
@@ -52,7 +55,7 @@ public class PackagedProgramTest {
Assert.fail("Test is erroneous: " + e.getMessage());
}
}
-
+
private static final class NullOutputStream extends java.io.OutputStream {
@Override
public void write(int b) {}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
index 55056ca..1923ee6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
@@ -16,15 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.client.testjar;
/**
- * Simulate a class that requires an external dependency
+ * Simulate a class that requires an external dependency.
*
*/
public class JobWithExternalDependency {
-
+
public static final String EXTERNAL_CLASS = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
public static void main(String[] args) throws ClassNotFoundException {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index b4ff616..a7070ef 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -18,36 +18,35 @@
package org.apache.flink.client.testjar;
-import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
/**
* WordCount for placing at least something into the jar file.
*/
public class WordCount {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
+
+ if (!parseParameters(args)) {
return;
}
-
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// get input data
DataSet<String> text = getTextDataSet(env);
-
- DataSet<Tuple2<String, Integer>> counts =
+
+ DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
@@ -55,7 +54,7 @@ public class WordCount {
.aggregate(Aggregations.SUM, 1);
// emit result
- if(fileOutput) {
+ if (fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
// execute program
env.execute("WordCount Example");
@@ -63,15 +62,15 @@ public class WordCount {
counts.print();
}
}
-
+
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
-
+
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
- * FlatMapFunction. The function takes a line (String) and splits it into
- * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@@ -81,7 +80,7 @@ public class WordCount {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
-
+
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
@@ -90,25 +89,25 @@ public class WordCount {
}
}
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static boolean fileOutput = false;
-
+
private static String textPath;
private static String outputPath;
-
+
private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
+
+ if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if(args.length == 2) { // cli line: program {textPath} {outputPath}
+ if (args.length == 2) { // cli line: program {textPath} {outputPath}
textPath = args[0];
outputPath = args[1];
- } else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
+ } else if (args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
Boolean.valueOf(args[1]); // parse verbosity flag
textPath = args[2];
outputPath = args[3];
@@ -123,9 +122,9 @@ public class WordCount {
}
return true;
}
-
+
private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
+ if (fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {