You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/11 13:57:20 UTC
[2/7] flink git commit: [FLINK-6903] [runtime] Activate checkstyle
for runtime/akka
[FLINK-6903] [runtime] Activate checkstyle for runtime/akka
This closes #4114
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b91df160
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b91df160
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b91df160
Branch: refs/heads/master
Commit: b91df16071145911a85a3e76a821a7ff6709b34f
Parents: a68c15f
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jun 12 13:50:07 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 07:49:35 2017 -0400
----------------------------------------------------------------------
flink-runtime/pom.xml | 1 -
.../runtime/akka/DefaultQuarantineHandler.java | 6 ++++--
.../flink/runtime/akka/FlinkUntypedActor.java | 17 ++++++++++-------
.../flink/runtime/akka/QuarantineHandler.java | 4 ++--
.../flink/runtime/akka/QuarantineMonitor.java | 5 +++--
.../runtime/akka/FlinkUntypedActorTest.java | 14 +++++++++-----
.../runtime/akka/QuarantineMonitorTest.java | 19 ++++++++++++-------
7 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 8800f74..5644cfd 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -426,7 +426,6 @@ under the License.
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
<excludes>
- **/runtime/akka/**,
**/runtime/blob/**,
**/runtime/checkpoint/**,
**/runtime/client/**,
http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
index 378cb25..708437f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -18,11 +18,13 @@
package org.apache.flink.runtime.akka;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
import org.slf4j.Logger;
+
import scala.concurrent.duration.FiniteDuration;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 05ae501..8078c26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.akka;
-import akka.actor.UntypedActor;
import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import akka.actor.UntypedActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,20 +34,22 @@ import java.util.UUID;
* of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is
* detected, then an Exception is thrown.
*
- * In order to implement the actor behavior, an implementing subclass has to override the method
+ * <p>In order to implement the actor behavior, an implementing subclass has to override the method
* handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide
* a leader session ID option which is returned by getLeaderSessionID.
*/
public abstract class FlinkUntypedActor extends UntypedActor {
+ //CHECKSTYLE.OFF: MemberNameCheck - re-enable after JobManager/TaskManager refactoring in FLIP-6?
protected final Logger LOG = LoggerFactory.getLogger(getClass());
+ //CHECKSTYLE.ON: MemberNameCheck
/**
* This method is called by Akka if a new message has arrived for the actor. It logs the
* processing time of the incoming message if the logging level is set to debug. After logging
* the handleLeaderSessionID method is called.
*
- * Important: This method cannot be overriden. The actor specific message handling logic is
+ * <p>Important: This method cannot be overriden. The actor specific message handling logic is
* implemented by the method handleMessage.
*
* @param message Incoming message
@@ -54,14 +57,14 @@ public abstract class FlinkUntypedActor extends UntypedActor {
*/
@Override
public final void onReceive(Object message) throws Exception {
- if(LOG.isTraceEnabled()) {
+ if (LOG.isTraceEnabled()) {
LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());
long start = System.nanoTime();
handleLeaderSessionID(message);
- long duration = (System.nanoTime() - start)/ 1_000_000;
+ long duration = (System.nanoTime() - start) / 1_000_000;
LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
} else {
@@ -124,14 +127,14 @@ public abstract class FlinkUntypedActor extends UntypedActor {
* Returns the current leader session ID associcated with this actor.
* @return
*/
- abstract protected UUID getLeaderSessionID();
+ protected abstract UUID getLeaderSessionID();
/**
* This method should be called for every outgoing message. It wraps messages which require
* a leader session ID (indicated by {@link RequiresLeaderSessionID}) in a
* {@link LeaderSessionMessage} with the actor's leader session ID.
*
- * This method can be overriden to implement a different decoration behavior.
+ * <p>This method can be overriden to implement a different decoration behavior.
*
* @param message Message to be decorated
* @return The deocrated message
http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
index 21623e8..e6d3820 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
@@ -33,7 +33,7 @@ public interface QuarantineHandler {
* actor system
* @param actorSystem which has been quarantined
*/
- void wasQuarantinedBy(final String remoteSystem, final ActorSystem actorSystem);
+ void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem);
/**
* Callback when the given actor system has quarantined the given remote actor system.
@@ -42,5 +42,5 @@ public interface QuarantineHandler {
* by our actor system
* @param actorSystem which has quarantined the other actor system
*/
- void hasQuarantined(final String remoteSystem, final ActorSystem actorSystem);
+ void hasQuarantined(String remoteSystem, ActorSystem actorSystem);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
index de82f29..c1075eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
@@ -18,10 +18,11 @@
package org.apache.flink.runtime.akka;
+import org.apache.flink.util.Preconditions;
+
import akka.actor.UntypedActor;
import akka.remote.AssociationErrorEvent;
import akka.remote.transport.Transport;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import java.util.regex.Matcher;
@@ -33,7 +34,7 @@ import java.util.regex.Pattern;
* or quarantine another remote actor system. If the actor detects that the actor system has been
* quarantined or quarantined another system, then the {@link QuarantineHandler} is called.
*
- * IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
+ * <p>IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
* quarantine state might be detected differently.
*/
public class QuarantineMonitor extends UntypedActor {
http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 3ffc68f..00a8475 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -18,15 +18,16 @@
package org.apache.flink.runtime.akka;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -36,6 +37,9 @@ import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link FlinkUntypedActor}.
+ */
public class FlinkUntypedActorTest {
private static ActorSystem actorSystem;
@@ -89,7 +93,7 @@ public class FlinkUntypedActorTest {
TestActorRef<PlainFlinkUntypedActor> actor = null;
- try{
+ try {
final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID);
actor = TestActorRef.create(actorSystem, props);
@@ -113,7 +117,7 @@ public class FlinkUntypedActorTest {
}
private static void stopActor(ActorRef actor) {
- if(actor != null) {
+ if (actor != null) {
actor.tell(Kill.getInstance(), ActorRef.noSender());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 97309a4..09e829e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,6 +18,13 @@
package org.apache.flink.runtime.akka;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
@@ -26,12 +33,6 @@ import akka.actor.UntypedActor;
import akka.dispatch.OnComplete;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -40,12 +41,16 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests for {@link QuarantineMonitor}.
+ */
public class QuarantineMonitorTest extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);