You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 06:06:46 UTC
[2/3] flink git commit: [FLINK-6719] Activate strict checkstyle for
flink-clients
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index f47ca69..a75f49b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -18,27 +18,18 @@
package org.apache.flink.client;
-import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
-import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
-import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath;
-import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
-import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.ProgramOptions;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.configuration.Configuration;
-
+import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -46,9 +37,24 @@ import org.junit.Test;
import java.io.FileNotFoundException;
import java.net.URL;
-
+import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
+import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath;
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the RUN command with {@link PackagedProgram PackagedPrograms}.
+ */
public class CliFrontendPackageProgramTest {
-
+
@BeforeClass
public static void init() {
pipeSystemOutToNull();
@@ -75,7 +81,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testFileNotJarFile() {
try {
@@ -97,7 +103,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testVariantWithExplicitJarAndArgumentsOption() {
try {
@@ -125,7 +131,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testVariantWithExplicitJarAndNoArgumentsOption() {
try {
@@ -154,7 +160,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testValidVariantWithNoJarAndNoArgumentsOption() {
try {
@@ -183,7 +189,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testNoJarNoArgumentsAtAll() {
try {
@@ -195,7 +201,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testNonExistingFileWithArguments() {
try {
@@ -227,7 +233,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testNonExistingFileWithoutArguments() {
try {
@@ -251,7 +257,7 @@ public class CliFrontendPackageProgramTest {
fail(e.getMessage());
}
}
-
+
/**
* Ensure that we will never have the following error.
*
@@ -276,7 +282,7 @@ public class CliFrontendPackageProgramTest {
* at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
* </pre>
*
- * The test works as follows:
+ * <p>The test works as follows:
*
* <ul>
* <li> Use the CliFrontend to invoke a jar file that loads a class which is only available
@@ -303,7 +309,7 @@ public class CliFrontendPackageProgramTest {
assertArrayEquals(classpath, options.getClasspaths().toArray());
assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, options.getEntryPointClassName());
assertArrayEquals(reducedArguments, options.getProgramArgs());
-
+
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
PackagedProgram prog = spy(frontend.buildProgram(options));
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 91c4cf8..43116e4 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.client;
import org.apache.flink.client.cli.CliFrontendParser;
@@ -24,6 +23,7 @@ import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
import org.junit.BeforeClass;
import org.junit.Test;
@@ -34,14 +34,16 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
+/**
+ * Tests for the RUN command.
+ */
public class CliFrontendRunTest {
-
+
@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
}
-
+
@Test
public void testRun() {
try {
@@ -135,7 +137,7 @@ public class CliFrontendRunTest {
// --------------------------------------------------------------------------------------------
- public static final class RunTestingCliFrontend extends CliFrontend {
+ private static final class RunTestingCliFrontend extends CliFrontend {
private final int expectedParallelism;
private final boolean sysoutLogging;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index e26d5a5..cfed859 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -18,19 +18,16 @@
package org.apache.flink.client;
-import akka.dispatch.Futures;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import akka.dispatch.Futures;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
-import scala.Option;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -38,6 +35,11 @@ import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.zip.ZipOutputStream;
+import scala.Option;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
@@ -52,6 +54,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+/**
+ * Tests for the SAVEPOINT command.
+ */
public class CliFrontendSavepointTest {
private static PrintStream stdOut;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index 9522ac7..fef4880 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -18,16 +18,19 @@
package org.apache.flink.client;
-import akka.actor.*;
-import akka.testkit.JavaTestKit;
-
-import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,8 +38,11 @@ import org.junit.Test;
import java.util.UUID;
import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+/**
+ * Tests for the STOP command.
+ */
public class CliFrontendStopTest extends TestLogger {
private static ActorSystem actorSystem;
@@ -105,7 +111,7 @@ public class CliFrontendStopTest extends TestLogger {
}
}
- protected static final class StopTestCliFrontend extends CliFrontend {
+ private static final class StopTestCliFrontend extends CliFrontend {
private ActorGateway jobManagerGateway;
@@ -120,7 +126,7 @@ public class CliFrontendStopTest extends TestLogger {
}
}
- protected static final class CliJobManager extends FlinkUntypedActor {
+ private static final class CliJobManager extends FlinkUntypedActor {
private final JobID jobID;
private final UUID leaderSessionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index c411a7b..2a20d8e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -16,52 +16,49 @@
* limitations under the License.
*/
-
package org.apache.flink.client;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintStream;
-import java.lang.reflect.Field;
import java.net.MalformedURLException;
-import java.util.Map;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+/**
+ * Test utilities.
+ */
public class CliFrontendTestUtils {
-
+
public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";
-
+
public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "org.apache.flink.client.testjar.JobWithExternalDependency";
public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
public static final int TEST_JOB_MANAGER_PORT = 55443;
-
-
+
public static String getTestJarPath() throws FileNotFoundException, MalformedURLException {
File f = new File("target/maven-test-jar.jar");
- if(!f.exists()) {
+ if (!f.exists()) {
throw new FileNotFoundException("Test jar not present. Invoke tests using maven "
+ "or build the jar using 'mvn process-test-classes' in flink-clients");
}
return f.getAbsolutePath();
}
-
+
public static String getNonJarFilePath() {
return CliFrontendRunTest.class.getResource("/testconfig/flink-conf.yaml").getFile();
}
-
+
public static String getConfigDir() {
String confFile = CliFrontendRunTest.class.getResource("/testconfig/flink-conf.yaml").getFile();
return new File(confFile).getAbsoluteFile().getParent();
}
-
+
public static String getInvalidConfigDir() {
String confFile = CliFrontendRunTest.class.getResource("/invalidtestconfig/flink-conf.yaml").getFile();
return new File(confFile).getAbsoluteFile().getParent();
@@ -84,8 +81,8 @@ public class CliFrontendTestUtils {
assertEquals(expectedAddress, jobManagerAddress);
assertEquals(expectedPort, jobManagerPort);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
private CliFrontendTestUtils() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index be93949..73e99e5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
+
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,6 +36,9 @@ import java.util.Collections;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
+/**
+ * Tests the hostname resolution of the {@link RemoteExecutor}.
+ */
public class RemoteExecutorHostnameResolutionTest extends TestLogger {
private static final String nonExistingHostname = "foo.bar.com.invalid";
@@ -72,7 +76,7 @@ public class RemoteExecutorHostnameResolutionTest extends TestLogger {
// that is what we want!
}
}
-
+
private static Plan getProgram() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index eb9f3c5..2b760bd 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.client.program;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -35,13 +32,19 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.UUID;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* This test starts a job client without the JobManager being reachable. It
@@ -114,7 +117,7 @@ public class ClientConnectionTest extends TestLogger {
/**
* FLINK-6629
*
- * Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
+ * <p>Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
* {@link ActorSystem} and retrieving the leading JobManager.
*/
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 13a2564..9349401 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -18,11 +18,9 @@
package org.apache.flink.client.program;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-
import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.ProgramDescription;
@@ -42,18 +40,19 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.NetUtils;
-
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -61,8 +60,9 @@ import java.net.URL;
import java.util.Collections;
import java.util.UUID;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -87,13 +87,13 @@ public class ClientTest extends TestLogger {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
-
+
Plan plan = env.createProgramPlan();
JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(), Collections.<URL>emptyList());
program = mock(PackagedProgram.class);
when(program.getPlanWithJars()).thenReturn(jobWithJars);
-
+
final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
@@ -247,7 +247,7 @@ public class ClientTest extends TestLogger {
jobManagerSystem.actorOf(
Props.create(SuccessReturningActor.class),
JobMaster.JOB_MANAGER_NAME);
-
+
PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
when(packagedProgramMock.isUsingInteractiveMode()).thenReturn(true);
doAnswer(new Answer<Void>() {
@@ -280,10 +280,10 @@ public class ClientTest extends TestLogger {
jobManagerSystem.actorOf(
Props.create(FailureReturningActor.class),
JobMaster.JOB_MANAGER_NAME);
-
+
PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
assertNotNull(prg.getPreviewPlan());
-
+
Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, 1);
assertNotNull(op);
@@ -306,7 +306,7 @@ public class ClientTest extends TestLogger {
// --------------------------------------------------------------------------------------------
- public static class SuccessReturningActor extends FlinkUntypedActor {
+ private static class SuccessReturningActor extends FlinkUntypedActor {
private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
@@ -336,7 +336,7 @@ public class ClientTest extends TestLogger {
}
}
- public static class FailureReturningActor extends FlinkUntypedActor {
+ private static class FailureReturningActor extends FlinkUntypedActor {
private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
@@ -353,7 +353,10 @@ public class ClientTest extends TestLogger {
return leaderSessionID;
}
}
-
+
+ /**
+ * A test job.
+ */
public static class TestOptimizerPlan implements ProgramDescription {
@SuppressWarnings("serial")
@@ -369,23 +372,27 @@ public class ClientTest extends TestLogger {
.fieldDelimiter("\t").types(Long.class, Long.class);
DataSet<Tuple2<Long, Long>> result = input.map(
- new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+ new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
- return new Tuple2<Long, Long>(value.f0, value.f1+1);
+ return new Tuple2<Long, Long>(value.f0, value.f1 + 1);
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}
+
@Override
public String getDescription() {
return "TestOptimizerPlan <input-file-path> <output-file-path>";
}
}
+ /**
+ * Test job that calls {@link ExecutionEnvironment#execute()} twice.
+ */
public static final class TestExecuteTwice {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute();
@@ -393,44 +400,59 @@ public class ClientTest extends TestLogger {
}
}
+ /**
+ * Test job that uses an eager sink.
+ */
public static final class TestEager {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).collect();
}
}
+ /**
+ * Test job that retrieves the net runtime from the {@link JobExecutionResult}.
+ */
public static final class TestGetRuntime {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getNetRuntime();
}
}
+ /**
+ * Test job that retrieves the job ID from the {@link JobExecutionResult}.
+ */
public static final class TestGetJobID {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getJobID();
}
}
+ /**
+ * Test job that retrieves an accumulator from the {@link JobExecutionResult}.
+ */
public static final class TestGetAccumulator {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getAccumulatorResult(ACCUMULATOR_NAME);
}
}
+ /**
+ * Test job that retrieves all accumulators from the {@link JobExecutionResult}.
+ */
public static final class TestGetAllAccumulator {
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
env.execute().getAllAccumulatorResults();
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 3879fa3..97a881c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -21,18 +21,22 @@ package org.apache.flink.client.program;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+/**
+ * Tests for the {@link ClusterClient}.
+ */
public class ClusterClientTest extends TestLogger {
/**
* FLINK-6641
*
- * Tests that the {@link ClusterClient} does not clean up HA data when being shut down.
+ * <p>Tests that the {@link ClusterClient} does not clean up HA data when being shut down.
*/
@Test
public void testClusterClientShutdown() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index 4ec0e47..ae30c3a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -26,25 +26,31 @@ import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import java.io.Serializable;
import static org.junit.Assert.fail;
+/**
+ * Tests that verify subsequent calls to {@link ExecutionEnvironment#getExecutionPlan()} and
+ * {@link ExecutionEnvironment#execute()}/{@link ExecutionEnvironment#createProgramPlan()} do not cause any exceptions.
+ */
@SuppressWarnings("serial")
public class ExecutionPlanAfterExecutionTest extends TestLogger implements Serializable {
@Test
public void testExecuteAfterGetExecutionPlan() {
- ExecutionEnvironment env = new LocalEnvironment();
+ ExecutionEnvironment env = new LocalEnvironment();
env.getConfig().disableSysoutLogging();
-
+
DataSet<Integer> baseSet = env.fromElements(1, 2);
DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
- @Override public Integer map(Integer value) throws Exception { return value * 2; }
- });
+ @Override public Integer map(Integer value) throws Exception {
+ return value * 2;
+ }});
result.output(new DiscardingOutputFormat<Integer>());
try {
@@ -56,16 +62,17 @@ public class ExecutionPlanAfterExecutionTest extends TestLogger implements Seria
fail("Cannot run both #getExecutionPlan and #execute.");
}
}
-
+
@Test
public void testCreatePlanAfterGetExecutionPlan() {
ExecutionEnvironment env = new LocalEnvironment();
-
+
DataSet<Integer> baseSet = env.fromElements(1, 2);
DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
- @Override public Integer map(Integer value) throws Exception { return value * 2; }
- });
+ @Override public Integer map(Integer value) throws Exception {
+ return value * 2;
+ }});
result.output(new DiscardingOutputFormat<Integer>());
try {
@@ -73,7 +80,7 @@ public class ExecutionPlanAfterExecutionTest extends TestLogger implements Seria
env.createProgramPlan();
} catch (Exception e) {
e.printStackTrace();
- fail("Cannot run both #getExecutionPlan and #execute. Message: "+e.getMessage());
+ fail("Cannot run both #getExecutionPlan and #execute. Message: " + e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index c291ada..9c5a878 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -18,25 +18,31 @@
package org.apache.flink.client.program;
-import static org.junit.Assert.*;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
+
import org.junit.Test;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the generation of execution plans.
+ */
public class ExecutionPlanCreationTest {
@Test
@@ -44,7 +50,7 @@ public class ExecutionPlanCreationTest {
try {
PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
assertNotNull(prg.getPreviewPlan());
-
+
InetAddress mockAddress = InetAddress.getLocalHost();
InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
@@ -56,15 +62,15 @@ public class ExecutionPlanCreationTest {
Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1);
assertNotNull(op);
-
+
PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
assertNotNull(dumper.getOptimizerPlanAsJSON(op));
-
+
// test HTML escaping
PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
dumper2.setEncodeForHTML(true);
String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
-
+
assertEquals(-1, htmlEscaped.indexOf('\\'));
}
catch (Exception e) {
@@ -72,30 +78,34 @@ public class ExecutionPlanCreationTest {
fail(e.getMessage());
}
}
-
+
+ /**
+ * A test job.
+ */
public static class TestOptimizerPlan implements ProgramDescription {
-
+
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
return;
}
-
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0])
.fieldDelimiter("\t").types(Long.class, Long.class);
-
+
DataSet<Tuple2<Long, Long>> result = input.map(
- new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+ new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
- return new Tuple2<Long, Long>(value.f0, value.f1+1);
+ return new Tuple2<Long, Long>(value.f0, value.f1 + 1);
}
});
result.writeAsCsv(args[1], "\n", "\t");
env.execute();
}
+
@Override
public String getDescription() {
return "TestOptimizerPlan <input-file-path> <output-file-path>";
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index 0ecdc2c..4731d44 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -24,18 +24,19 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.util.StandaloneUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.TestLogger;
+
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
/**
* Tests that verify that the LeaderRetrievalService correctly handles non-resolvable host names
- * and does not fail with another exception
+ * and does not fail with another exception.
*/
public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 95506f4..e68d1dc 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -18,27 +18,30 @@
package org.apache.flink.client.program;
-import java.io.File;
-import java.io.PrintStream;
-
import org.apache.flink.client.CliFrontendTestUtils;
+
import org.junit.Assert;
import org.junit.Test;
+import java.io.File;
+import java.io.PrintStream;
+/**
+ * Tests for the {@link PackagedProgramTest}.
+ */
public class PackagedProgramTest {
@Test
public void testGetPreviewPlan() {
try {
PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
-
+
final PrintStream out = System.out;
final PrintStream err = System.err;
try {
System.setOut(new PrintStream(new NullOutputStream()));
System.setErr(new PrintStream(new NullOutputStream()));
-
+
Assert.assertNotNull(prog.getPreviewPlan());
}
finally {
@@ -52,7 +55,7 @@ public class PackagedProgramTest {
Assert.fail("Test is erroneous: " + e.getMessage());
}
}
-
+
private static final class NullOutputStream extends java.io.OutputStream {
@Override
public void write(int b) {}
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
index 55056ca..1923ee6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
@@ -16,15 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.client.testjar;
/**
- * Simulate a class that requires an external dependency
+ * Simulate a class that requires an external dependency.
*
*/
public class JobWithExternalDependency {
-
+
public static final String EXTERNAL_CLASS = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
public static void main(String[] args) throws ClassNotFoundException {
http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index b4ff616..a7070ef 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -18,36 +18,35 @@
package org.apache.flink.client.testjar;
-import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
/**
* WordCount for placing at least something into the jar file.
*/
public class WordCount {
-
+
// *************************************************************************
// PROGRAM
// *************************************************************************
-
+
public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
+
+ if (!parseParameters(args)) {
return;
}
-
+
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// get input data
DataSet<String> text = getTextDataSet(env);
-
- DataSet<Tuple2<String, Integer>> counts =
+
+ DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
@@ -55,7 +54,7 @@ public class WordCount {
.aggregate(Aggregations.SUM, 1);
// emit result
- if(fileOutput) {
+ if (fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
// execute program
env.execute("WordCount Example");
@@ -63,15 +62,15 @@ public class WordCount {
counts.print();
}
}
-
+
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
-
+
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
- * FlatMapFunction. The function takes a line (String) and splits it into
- * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@@ -81,7 +80,7 @@ public class WordCount {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
-
+
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
@@ -90,25 +89,25 @@ public class WordCount {
}
}
}
-
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
-
+
private static boolean fileOutput = false;
-
+
private static String textPath;
private static String outputPath;
-
+
private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
+
+ if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if(args.length == 2) { // cli line: program {textPath} {outputPath}
+ if (args.length == 2) { // cli line: program {textPath} {outputPath}
textPath = args[0];
outputPath = args[1];
- } else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
+ } else if (args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
Boolean.valueOf(args[1]); // parse verbosity flag
textPath = args[2];
outputPath = args[3];
@@ -123,9 +122,9 @@ public class WordCount {
}
return true;
}
-
+
private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
+ if (fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {