You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 06:06:46 UTC

[2/3] flink git commit: [FLINK-6719] Activate strict checkstyle for flink-clients

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index f47ca69..a75f49b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -18,27 +18,18 @@
 
 package org.apache.flink.client;
 
-import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
-import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
-import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath;
-import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
-import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.configuration.Configuration;
-
+import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -46,9 +37,24 @@ import org.junit.Test;
 import java.io.FileNotFoundException;
 import java.net.URL;
 
-
+import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
+import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath;
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the RUN command with {@link PackagedProgram PackagedPrograms}.
+ */
 public class CliFrontendPackageProgramTest {
-	
+
 	@BeforeClass
 	public static void init() {
 		pipeSystemOutToNull();
@@ -75,7 +81,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testFileNotJarFile() {
 		try {
@@ -97,7 +103,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testVariantWithExplicitJarAndArgumentsOption() {
 		try {
@@ -125,7 +131,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testVariantWithExplicitJarAndNoArgumentsOption() {
 		try {
@@ -154,7 +160,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testValidVariantWithNoJarAndNoArgumentsOption() {
 		try {
@@ -183,7 +189,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testNoJarNoArgumentsAtAll() {
 		try {
@@ -195,7 +201,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testNonExistingFileWithArguments() {
 		try {
@@ -227,7 +233,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testNonExistingFileWithoutArguments() {
 		try {
@@ -251,7 +257,7 @@ public class CliFrontendPackageProgramTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	/**
 	 * Ensure that we will never have the following error.
 	 *
@@ -276,7 +282,7 @@ public class CliFrontendPackageProgramTest {
 	 *		at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
 	 * </pre>
 	 *
-	 * The test works as follows:
+	 * <p>The test works as follows:
 	 *
 	 * <ul>
 	 *   <li> Use the CliFrontend to invoke a jar file that loads a class which is only available
@@ -303,7 +309,7 @@ public class CliFrontendPackageProgramTest {
 			assertArrayEquals(classpath, options.getClasspaths().toArray());
 			assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, options.getEntryPointClassName());
 			assertArrayEquals(reducedArguments, options.getProgramArgs());
-			
+
 			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 			PackagedProgram prog = spy(frontend.buildProgram(options));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 91c4cf8..43116e4 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
 import org.apache.flink.client.cli.CliFrontendParser;
@@ -24,6 +23,7 @@ import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -34,14 +34,16 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
+/**
+ * Tests for the RUN command.
+ */
 public class CliFrontendRunTest {
-	
+
 	@BeforeClass
 	public static void init() {
 		CliFrontendTestUtils.pipeSystemOutToNull();
 	}
-	
+
 	@Test
 	public void testRun() {
 		try {
@@ -135,7 +137,7 @@ public class CliFrontendRunTest {
 
 	// --------------------------------------------------------------------------------------------
 
-	public static final class RunTestingCliFrontend extends CliFrontend {
+	private static final class RunTestingCliFrontend extends CliFrontend {
 
 		private final int expectedParallelism;
 		private final boolean sysoutLogging;

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index e26d5a5..cfed859 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -18,19 +18,16 @@
 
 package org.apache.flink.client;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+
+import akka.dispatch.Futures;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
-import scala.Option;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -38,6 +35,11 @@ import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.zip.ZipOutputStream;
 
+import scala.Option;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
 import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
@@ -52,6 +54,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the SAVEPOINT command.
+ */
 public class CliFrontendSavepointTest {
 
 	private static PrintStream stdOut;

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index 9522ac7..fef4880 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -18,16 +18,19 @@
 
 package org.apache.flink.client;
 
-import akka.actor.*;
-import akka.testkit.JavaTestKit;
-
-import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,8 +38,11 @@ import org.junit.Test;
 import java.util.UUID;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the STOP command.
+ */
 public class CliFrontendStopTest extends TestLogger {
 
 	private static ActorSystem actorSystem;
@@ -105,7 +111,7 @@ public class CliFrontendStopTest extends TestLogger {
 		}
 	}
 
-	protected static final class StopTestCliFrontend extends CliFrontend {
+	private static final class StopTestCliFrontend extends CliFrontend {
 
 		private ActorGateway jobManagerGateway;
 
@@ -120,7 +126,7 @@ public class CliFrontendStopTest extends TestLogger {
 		}
 	}
 
-	protected static final class CliJobManager extends FlinkUntypedActor {
+	private static final class CliJobManager extends FlinkUntypedActor {
 		private final JobID jobID;
 		private final UUID leaderSessionID;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index c411a7b..2a20d8e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -16,52 +16,49 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.PrintStream;
-import java.lang.reflect.Field;
 import java.net.MalformedURLException;
-import java.util.Map;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
+/**
+ * Test utilities.
+ */
 public class CliFrontendTestUtils {
-	
+
 	public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount";
-	
+
 	public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "org.apache.flink.client.testjar.JobWithExternalDependency";
 
 	public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
 
 	public static final int TEST_JOB_MANAGER_PORT = 55443;
-	
-	
+
 	public static String getTestJarPath() throws FileNotFoundException, MalformedURLException {
 		File f = new File("target/maven-test-jar.jar");
-		if(!f.exists()) {
+		if (!f.exists()) {
 			throw new FileNotFoundException("Test jar not present. Invoke tests using maven "
 					+ "or build the jar using 'mvn process-test-classes' in flink-clients");
 		}
 		return f.getAbsolutePath();
 	}
-	
+
 	public static String getNonJarFilePath() {
 		return CliFrontendRunTest.class.getResource("/testconfig/flink-conf.yaml").getFile();
 	}
-	
+
 	public static String getConfigDir() {
 		String confFile = CliFrontendRunTest.class.getResource("/testconfig/flink-conf.yaml").getFile();
 		return new File(confFile).getAbsoluteFile().getParent();
 	}
-	
+
 	public static String getInvalidConfigDir() {
 		String confFile = CliFrontendRunTest.class.getResource("/invalidtestconfig/flink-conf.yaml").getFile();
 		return new File(confFile).getAbsoluteFile().getParent();
@@ -84,8 +81,8 @@ public class CliFrontendTestUtils {
 		assertEquals(expectedAddress, jobManagerAddress);
 		assertEquals(expectedPort, jobManagerPort);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private CliFrontendTestUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
index be93949..73e99e5 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -35,6 +36,9 @@ import java.util.Collections;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
+/**
+ * Tests the hostname resolution of the {@link RemoteExecutor}.
+ */
 public class RemoteExecutorHostnameResolutionTest extends TestLogger {
 
 	private static final String nonExistingHostname = "foo.bar.com.invalid";
@@ -72,7 +76,7 @@ public class RemoteExecutorHostnameResolutionTest extends TestLogger {
 			// that is what we want!
 		}
 	}
-	
+
 	private static Plan getProgram() {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.fromElements(1, 2, 3).output(new DiscardingOutputFormat<Integer>());

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index eb9f3c5..2b760bd 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.client.program;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -35,13 +32,19 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.UUID;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * This test starts a job client without the JobManager being reachable. It
@@ -114,7 +117,7 @@ public class ClientConnectionTest extends TestLogger {
 	/**
 	 * FLINK-6629
 	 *
-	 * Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
+	 * <p>Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
 	 * {@link ActorSystem} and retrieving the leading JobManager.
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 13a2564..9349401 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.client.program;
 
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.ProgramDescription;
@@ -42,18 +40,19 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.NetUtils;
-
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -61,8 +60,9 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.UUID;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -87,13 +87,13 @@ public class ClientTest extends TestLogger {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 		env.generateSequence(1, 1000).output(new DiscardingOutputFormat<Long>());
-		
+
 		Plan plan = env.createProgramPlan();
 		JobWithJars jobWithJars = new JobWithJars(plan, Collections.<URL>emptyList(),  Collections.<URL>emptyList());
 
 		program = mock(PackagedProgram.class);
 		when(program.getPlanWithJars()).thenReturn(jobWithJars);
-		
+
 		final int freePort = NetUtils.getAvailablePort();
 		config = new Configuration();
 		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
@@ -247,7 +247,7 @@ public class ClientTest extends TestLogger {
 			jobManagerSystem.actorOf(
 				Props.create(SuccessReturningActor.class),
 				JobMaster.JOB_MANAGER_NAME);
-			
+
 			PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
 			when(packagedProgramMock.isUsingInteractiveMode()).thenReturn(true);
 			doAnswer(new Answer<Void>() {
@@ -280,10 +280,10 @@ public class ClientTest extends TestLogger {
 			jobManagerSystem.actorOf(
 				Props.create(FailureReturningActor.class),
 				JobMaster.JOB_MANAGER_NAME);
-			
+
 			PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
 			assertNotNull(prg.getPreviewPlan());
-			
+
 			Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 			OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, 1);
 			assertNotNull(op);
@@ -306,7 +306,7 @@ public class ClientTest extends TestLogger {
 
 	// --------------------------------------------------------------------------------------------
 
-	public static class SuccessReturningActor extends FlinkUntypedActor {
+	private static class SuccessReturningActor extends FlinkUntypedActor {
 
 		private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
 
@@ -336,7 +336,7 @@ public class ClientTest extends TestLogger {
 		}
 	}
 
-	public static class FailureReturningActor extends FlinkUntypedActor {
+	private static class FailureReturningActor extends FlinkUntypedActor {
 
 		private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID;
 
@@ -353,7 +353,10 @@ public class ClientTest extends TestLogger {
 			return leaderSessionID;
 		}
 	}
-	
+
+	/**
+	 * A test job.
+	 */
 	public static class TestOptimizerPlan implements ProgramDescription {
 
 		@SuppressWarnings("serial")
@@ -369,23 +372,27 @@ public class ClientTest extends TestLogger {
 					.fieldDelimiter("\t").types(Long.class, Long.class);
 
 			DataSet<Tuple2<Long, Long>> result = input.map(
-					new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
 						public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
-							return new Tuple2<Long, Long>(value.f0, value.f1+1);
+							return new Tuple2<Long, Long>(value.f0, value.f1 + 1);
 						}
 					});
 			result.writeAsCsv(args[1], "\n", "\t");
 			env.execute();
 		}
+
 		@Override
 		public String getDescription() {
 			return "TestOptimizerPlan <input-file-path> <output-file-path>";
 		}
 	}
 
+	/**
+	 * Test job that calls {@link ExecutionEnvironment#execute()} twice.
+	 */
 	public static final class TestExecuteTwice {
 
-		public static void main(String args[]) throws Exception {
+		public static void main(String[] args) throws Exception {
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
 			env.execute();
@@ -393,44 +400,59 @@ public class ClientTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Test job that uses an eager sink.
+	 */
 	public static final class TestEager {
 
-		public static void main(String args[]) throws Exception {
+		public static void main(String[] args) throws Exception {
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.fromElements(1, 2).collect();
 		}
 	}
 
+	/**
+	 * Test job that retrieves the net runtime from the {@link JobExecutionResult}.
+	 */
 	public static final class TestGetRuntime {
 
-		public static void main(String args[]) throws Exception {
+		public static void main(String[] args) throws Exception {
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
 			env.execute().getNetRuntime();
 		}
 	}
 
+	/**
+	 * Test job that retrieves the job ID from the {@link JobExecutionResult}.
+	 */
 	public static final class TestGetJobID {
 
-		public static void main(String args[]) throws Exception {
+		public static void main(String[] args) throws Exception {
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
 			env.execute().getJobID();
 		}
 	}
 
+	/**
+	 * Test job that retrieves an accumulator from the {@link JobExecutionResult}.
+	 */
 	public static final class TestGetAccumulator {
 
-		public static void main(String args[]) throws Exception {
+		public static void main(String[] args) throws Exception {
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
 			env.execute().getAccumulatorResult(ACCUMULATOR_NAME);
 		}
 	}
 
+	/**
+	 * Test job that retrieves all accumulators from the {@link JobExecutionResult}.
+	 */
 	public static final class TestGetAllAccumulator {
 
-		public static void main(String args[]) throws Exception {
+		public static void main(String[] args) throws Exception {
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			env.fromElements(1, 2).output(new DiscardingOutputFormat<Integer>());
 			env.execute().getAllAccumulatorResults();

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 3879fa3..97a881c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -21,18 +21,22 @@ package org.apache.flink.client.program;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for the {@link ClusterClient}.
+ */
 public class ClusterClientTest extends TestLogger {
 
 	/**
 	 * FLINK-6641
 	 *
-	 * Tests that the {@link ClusterClient} does not clean up HA data when being shut down.
+	 * <p>Tests that the {@link ClusterClient} does not clean up HA data when being shut down.
 	 */
 	@Test
 	public void testClusterClientShutdown() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index 4ec0e47..ae30c3a 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -26,25 +26,31 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.io.Serializable;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Tests that verify subsequent calls to {@link ExecutionEnvironment#getExecutionPlan()} and
+ * {@link ExecutionEnvironment#execute()}/{@link ExecutionEnvironment#createProgramPlan()} do not cause any exceptions.
+ */
 @SuppressWarnings("serial")
 public class ExecutionPlanAfterExecutionTest extends TestLogger implements Serializable {
 
 	@Test
 	public void testExecuteAfterGetExecutionPlan() {
-		ExecutionEnvironment env = new LocalEnvironment(); 
+		ExecutionEnvironment env = new LocalEnvironment();
 		env.getConfig().disableSysoutLogging();
-		
+
 		DataSet<Integer> baseSet = env.fromElements(1, 2);
 
 		DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
-			@Override public Integer map(Integer value) throws Exception { return value * 2; }
-		});
+			@Override public Integer map(Integer value) throws Exception {
+				return value * 2;
+			}});
 		result.output(new DiscardingOutputFormat<Integer>());
 
 		try {
@@ -56,16 +62,17 @@ public class ExecutionPlanAfterExecutionTest extends TestLogger implements Seria
 			fail("Cannot run both #getExecutionPlan and #execute.");
 		}
 	}
-	
+
 	@Test
 	public void testCreatePlanAfterGetExecutionPlan() {
 		ExecutionEnvironment env = new LocalEnvironment();
-		
+
 		DataSet<Integer> baseSet = env.fromElements(1, 2);
 
 		DataSet<Integer> result = baseSet.map(new MapFunction<Integer, Integer>() {
-			@Override public Integer map(Integer value) throws Exception { return value * 2; }
-		});
+			@Override public Integer map(Integer value) throws Exception {
+				return value * 2;
+			}});
 		result.output(new DiscardingOutputFormat<Integer>());
 
 		try {
@@ -73,7 +80,7 @@ public class ExecutionPlanAfterExecutionTest extends TestLogger implements Seria
 			env.createProgramPlan();
 		} catch (Exception e) {
 			e.printStackTrace();
-			fail("Cannot run both #getExecutionPlan and #execute. Message: "+e.getMessage());
+			fail("Cannot run both #getExecutionPlan and #execute. Message: " + e.getMessage());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index c291ada..9c5a878 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -18,25 +18,31 @@
 
 package org.apache.flink.client.program;
 
-import static org.junit.Assert.*;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.configuration.Configuration;
+
 import org.junit.Test;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the generation of execution plans.
+ */
 public class ExecutionPlanCreationTest {
 
 	@Test
@@ -44,7 +50,7 @@ public class ExecutionPlanCreationTest {
 		try {
 			PackagedProgram prg = new PackagedProgram(TestOptimizerPlan.class, "/dev/random", "/tmp");
 			assertNotNull(prg.getPreviewPlan());
-			
+
 			InetAddress mockAddress = InetAddress.getLocalHost();
 			InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
 
@@ -56,15 +62,15 @@ public class ExecutionPlanCreationTest {
 			Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 			OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1);
 			assertNotNull(op);
-			
+
 			PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
 			assertNotNull(dumper.getOptimizerPlanAsJSON(op));
-			
+
 			// test HTML escaping
 			PlanJSONDumpGenerator dumper2 = new PlanJSONDumpGenerator();
 			dumper2.setEncodeForHTML(true);
 			String htmlEscaped = dumper2.getOptimizerPlanAsJSON(op);
-			
+
 			assertEquals(-1, htmlEscaped.indexOf('\\'));
 		}
 		catch (Exception e) {
@@ -72,30 +78,34 @@ public class ExecutionPlanCreationTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * A test job.
+	 */
 	public static class TestOptimizerPlan implements ProgramDescription {
-		
+
 		@SuppressWarnings("serial")
 		public static void main(String[] args) throws Exception {
 			if (args.length < 2) {
 				System.err.println("Usage: TestOptimizerPlan <input-file-path> <output-file-path>");
 				return;
 			}
-			
+
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
+
 			DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0])
 					.fieldDelimiter("\t").types(Long.class, Long.class);
-			
+
 			DataSet<Tuple2<Long, Long>> result = input.map(
-					new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
+					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
 						public Tuple2<Long, Long> map(Tuple2<Long, Long> value){
-							return new Tuple2<Long, Long>(value.f0, value.f1+1);
+							return new Tuple2<Long, Long>(value.f0, value.f1 + 1);
 						}
 			});
 			result.writeAsCsv(args[1], "\n", "\t");
 			env.execute();
 		}
+
 		@Override
 		public String getDescription() {
 			return "TestOptimizerPlan <input-file-path> <output-file-path>";

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
index 0ecdc2c..4731d44 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/LeaderRetrievalServiceHostnameResolutionTest.java
@@ -24,18 +24,19 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.util.StandaloneUtils;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
 /**
  * Tests that verify that the LeaderRetrievalService correctly handles non-resolvable host names
- * and does not fail with another exception
+ * and does not fail with another exception.
  */
 public class LeaderRetrievalServiceHostnameResolutionTest extends TestLogger {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 95506f4..e68d1dc 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -18,27 +18,30 @@
 
 package org.apache.flink.client.program;
 
-import java.io.File;
-import java.io.PrintStream;
-
 import org.apache.flink.client.CliFrontendTestUtils;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.PrintStream;
 
+/**
+ * Tests for the {@link PackagedProgramTest}.
+ */
 public class PackagedProgramTest {
 
 	@Test
 	public void testGetPreviewPlan() {
 		try {
 			PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
-			
+
 			final PrintStream out = System.out;
 			final PrintStream err = System.err;
 			try {
 				System.setOut(new PrintStream(new NullOutputStream()));
 				System.setErr(new PrintStream(new NullOutputStream()));
-				
+
 				Assert.assertNotNull(prog.getPreviewPlan());
 			}
 			finally {
@@ -52,7 +55,7 @@ public class PackagedProgramTest {
 			Assert.fail("Test is erroneous: " + e.getMessage());
 		}
 	}
-	
+
 	private static final class NullOutputStream extends java.io.OutputStream {
 		@Override
 		public void write(int b) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
index 55056ca..1923ee6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/JobWithExternalDependency.java
@@ -16,15 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.testjar;
 
 /**
- * Simulate a class that requires an external dependency
+ * Simulate a class that requires an external dependency.
  *
  */
 public class JobWithExternalDependency {
-	
+
 	public static final String EXTERNAL_CLASS = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
 
 	public static void main(String[] args) throws ClassNotFoundException {

http://git-wip-us.apache.org/repos/asf/flink/blob/c793ea41/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
index b4ff616..a7070ef 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/WordCount.java
@@ -18,36 +18,35 @@
 
 package org.apache.flink.client.testjar;
 
-import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
 
 /**
  * WordCount for placing at least something into the jar file.
  */
 public class WordCount {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
+
 	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
+
+		if (!parseParameters(args)) {
 			return;
 		}
-		
+
 		// set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		// get input data
 		DataSet<String> text = getTextDataSet(env);
-		
-		DataSet<Tuple2<String, Integer>> counts = 
+
+		DataSet<Tuple2<String, Integer>> counts =
 				// split up the lines in pairs (2-tuples) containing: (word,1)
 				text.flatMap(new Tokenizer())
 				// group by the tuple field "0" and sum up tuple field "1"
@@ -55,7 +54,7 @@ public class WordCount {
 				.aggregate(Aggregations.SUM, 1);
 
 		// emit result
-		if(fileOutput) {
+		if (fileOutput) {
 			counts.writeAsCsv(outputPath, "\n", " ");
 			// execute program
 			env.execute("WordCount Example");
@@ -63,15 +62,15 @@ public class WordCount {
 			counts.print();
 		}
 	}
-	
+
 	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
-	
+
 	/**
 	 * Implements the string tokenizer that splits sentences into words as a user-defined
-	 * FlatMapFunction. The function takes a line (String) and splits it into 
-	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 * FlatMapFunction. The function takes a line (String) and splits it into
+	 * multiple pairs in the form of "(word,1)" (Tuple2&lt;String, Integer&gt;).
 	 */
 	public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
 
@@ -81,7 +80,7 @@ public class WordCount {
 		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
 			// normalize and split the line
 			String[] tokens = value.toLowerCase().split("\\W+");
-			
+
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
@@ -90,25 +89,25 @@ public class WordCount {
 			}
 		}
 	}
-	
+
 	// *************************************************************************
 	//     UTIL METHODS
 	// *************************************************************************
-	
+
 	private static boolean fileOutput = false;
-	
+
 	private static String textPath;
 	private static String outputPath;
-	
+
 	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
+
+		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
-			if(args.length == 2) { // cli line: program {textPath} {outputPath}
+			if (args.length == 2) { // cli line: program {textPath} {outputPath}
 				textPath = args[0];
 				outputPath = args[1];
-			} else if(args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
+			} else if (args.length == 4 && (args[0].startsWith("-v") || args[0].startsWith("--verbose"))) { // cli line: program {optArg} {optVal} {textPath} {outputPath}
 				Boolean.valueOf(args[1]); // parse verbosity flag
 				textPath = args[2];
 				outputPath = args[3];
@@ -123,9 +122,9 @@ public class WordCount {
 		}
 		return true;
 	}
-	
+
 	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
+		if (fileOutput) {
 			// read the text file from given input path
 			return env.readTextFile(textPath);
 		} else {