You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/11 13:57:20 UTC

[2/7] flink git commit: [FLINK-6903] [runtime] Activate checkstyle for runtime/akka

[FLINK-6903] [runtime] Activate checkstyle for runtime/akka

This closes #4114


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b91df160
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b91df160
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b91df160

Branch: refs/heads/master
Commit: b91df16071145911a85a3e76a821a7ff6709b34f
Parents: a68c15f
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jun 12 13:50:07 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 07:49:35 2017 -0400

----------------------------------------------------------------------
 flink-runtime/pom.xml                            |  1 -
 .../runtime/akka/DefaultQuarantineHandler.java   |  6 ++++--
 .../flink/runtime/akka/FlinkUntypedActor.java    | 17 ++++++++++-------
 .../flink/runtime/akka/QuarantineHandler.java    |  4 ++--
 .../flink/runtime/akka/QuarantineMonitor.java    |  5 +++--
 .../runtime/akka/FlinkUntypedActorTest.java      | 14 +++++++++-----
 .../runtime/akka/QuarantineMonitorTest.java      | 19 ++++++++++++-------
 7 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 8800f74..5644cfd 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -426,7 +426,6 @@ under the License.
 					<logViolationsToConsole>true</logViolationsToConsole>
 					<failOnViolation>true</failOnViolation>
 					<excludes>
-						**/runtime/akka/**,
 						**/runtime/blob/**,
 						**/runtime/checkpoint/**,
 						**/runtime/client/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
index 378cb25..708437f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.akka;
 
-import akka.actor.ActorSystem;
-import akka.actor.Address;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
 import org.slf4j.Logger;
+
 import scala.concurrent.duration.FiniteDuration;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 05ae501..8078c26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.akka;
 
-import akka.actor.UntypedActor;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import akka.actor.UntypedActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,20 +34,22 @@ import java.util.UUID;
  * of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is
  * detected, then an Exception is thrown.
  *
- * In order to implement the actor behavior, an implementing subclass has to override the method
+ * <p>In order to implement the actor behavior, an implementing subclass has to override the method
  * handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide
  * a leader session ID option which is returned by getLeaderSessionID.
  */
 public abstract class FlinkUntypedActor extends UntypedActor {
 
+	//CHECKSTYLE.OFF: MemberNameCheck - re-enable after JobManager/TaskManager refactoring in FLIP-6?
 	protected final Logger LOG = LoggerFactory.getLogger(getClass());
+	//CHECKSTYLE.ON: MemberNameCheck
 
 	/**
 	 * This method is called by Akka if a new message has arrived for the actor. It logs the
 	 * processing time of the incoming message if the logging level is set to debug. After logging
 	 * the handleLeaderSessionID method is called.
 	 *
-	 * Important: This method cannot be overriden. The actor specific message handling logic is
+	 * <p>Important: This method cannot be overriden. The actor specific message handling logic is
 	 * implemented by the method handleMessage.
 	 *
 	 * @param message Incoming message
@@ -54,14 +57,14 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 */
 	@Override
 	public final void onReceive(Object message) throws Exception {
-		if(LOG.isTraceEnabled()) {
+		if (LOG.isTraceEnabled()) {
 			LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());
 
 			long start = System.nanoTime();
 
 			handleLeaderSessionID(message);
 
-			long duration = (System.nanoTime() - start)/ 1_000_000;
+			long duration = (System.nanoTime() - start) / 1_000_000;
 
 			LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
 		} else {
@@ -124,14 +127,14 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 * Returns the current leader session ID associcated with this actor.
 	 * @return
 	 */
-	abstract protected UUID getLeaderSessionID();
+	protected abstract UUID getLeaderSessionID();
 
 	/**
 	 * This method should be called for every outgoing message. It wraps messages which require
 	 * a leader session ID (indicated by {@link RequiresLeaderSessionID}) in a
 	 * {@link LeaderSessionMessage} with the actor's leader session ID.
 	 *
-	 * This method can be overriden to implement a different decoration behavior.
+	 * <p>This method can be overriden to implement a different decoration behavior.
 	 *
 	 * @param message Message to be decorated
 	 * @return The deocrated message

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
index 21623e8..e6d3820 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
@@ -33,7 +33,7 @@ public interface QuarantineHandler {
 	 *                     actor system
 	 * @param actorSystem which has been quarantined
 	 */
-	void wasQuarantinedBy(final String remoteSystem, final ActorSystem actorSystem);
+	void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem);
 
 	/**
 	 * Callback when the given actor system has quarantined the given remote actor system.
@@ -42,5 +42,5 @@ public interface QuarantineHandler {
 	 *                     by our actor system
 	 * @param actorSystem which has quarantined the other actor system
 	 */
-	void hasQuarantined(final String remoteSystem, final ActorSystem actorSystem);
+	void hasQuarantined(String remoteSystem, ActorSystem actorSystem);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
index de82f29..c1075eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.runtime.akka;
 
+import org.apache.flink.util.Preconditions;
+
 import akka.actor.UntypedActor;
 import akka.remote.AssociationErrorEvent;
 import akka.remote.transport.Transport;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.regex.Matcher;
@@ -33,7 +34,7 @@ import java.util.regex.Pattern;
  * or quarantine another remote actor system. If the actor detects that the actor system has been
  * quarantined or quarantined another system, then the {@link QuarantineHandler} is called.
  *
- * IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
+ * <p>IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
  * quarantine state might be detected differently.
  */
 public class QuarantineMonitor extends UntypedActor {

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 3ffc68f..00a8475 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.runtime.akka;
 
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,6 +37,9 @@ import java.util.UUID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link FlinkUntypedActor}.
+ */
 public class FlinkUntypedActorTest {
 
 	private static ActorSystem actorSystem;
@@ -89,7 +93,7 @@ public class FlinkUntypedActorTest {
 
 		TestActorRef<PlainFlinkUntypedActor> actor = null;
 
-		try{
+		try {
 			final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID);
 			actor = TestActorRef.create(actorSystem, props);
 
@@ -113,7 +117,7 @@ public class FlinkUntypedActorTest {
 	}
 
 	private static void stopActor(ActorRef actor) {
-		if(actor != null) {
+		if (actor != null) {
 			actor.tell(Kill.getInstance(), ActorRef.noSender());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 97309a4..09e829e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.runtime.akka;
 
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
@@ -26,12 +33,6 @@ import akka.actor.UntypedActor;
 import akka.dispatch.OnComplete;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -40,12 +41,16 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests for {@link QuarantineMonitor}.
+ */
 public class QuarantineMonitorTest extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);