You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/09/23 11:19:46 UTC
flink git commit: [test-stability] Replaces Curator's TestingCluster
with TestingServer in ZooKeeperElection/RetrievalTests. Increased the timeout
to 200s in ZooKeeperLeaderElectionTest.
Repository: flink
Updated Branches:
refs/heads/master d58caa8ec -> b5a3f55a3
[test-stability] Replaces Curator's TestingCluster with TestingServer in ZooKeeperElection/RetrievalTests. Increased the timeout to 200s in ZooKeeperLeaderElectionTest.
Adds logging statements to ZooKeeperLeaderElection/RetrievalService
Adds more debug logging and enabled debug logging for ZooKeeperLeaderElection/RetrievalService
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5a3f55a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5a3f55a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5a3f55a
Branch: refs/heads/master
Commit: b5a3f55a3a5cf8317f5e655bff89a8adad658b9d
Parents: d58caa8
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 16 16:01:13 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 23 11:08:38 2015 +0200
----------------------------------------------------------------------
.../ZooKeeperLeaderElectionService.java | 59 +++++++++++++++++++-
.../ZooKeeperLeaderRetrievalService.java | 7 +++
.../JobManagerLeaderElectionTest.java | 19 +++----
.../leaderelection/TestingContender.java | 8 +++
.../runtime/leaderelection/TestingListener.java | 5 ++
.../ZooKeeperLeaderElectionTest.java | 28 +++++-----
.../ZooKeeperLeaderRetrievalTest.java | 17 +++---
tools/log4j-travis.properties | 4 +-
8 files changed, 110 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 709d031..ae3f0e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -122,6 +122,13 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
@Override
public void confirmLeaderSessionID(UUID leaderSessionID) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Confirm leader session ID {} for leader {}.",
+ leaderSessionID,
+ leaderContender.getAddress());
+ }
+
Preconditions.checkNotNull(leaderSessionID);
if(leaderLatch.hasLeadership()) {
@@ -133,8 +140,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
}
}
} else {
- LOG.warn("The leader session ID " + leaderSessionID + " was confirmed even though the" +
- "corresponding JobManager was not elected as the leader.");
+ LOG.warn("The leader session ID {} was confirmed even though the" +
+ "corresponding JobManager was not elected as the leader.", leaderSessionID);
}
}
@@ -152,6 +159,14 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
synchronized (lock) {
issuedLeaderSessionID = UUID.randomUUID();
confirmedLeaderSessionID = null;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Grant leadership to contender {} with session ID {}.",
+ leaderContender.getAddress(),
+ issuedLeaderSessionID);
+ }
+
leaderContender.grantLeadership(issuedLeaderSessionID);
}
}
@@ -161,6 +176,11 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
synchronized (lock) {
issuedLeaderSessionID = null;
confirmedLeaderSessionID = null;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Revoke leadership of {}.", leaderContender.getAddress());
+ }
+
leaderContender.revokeLeadership();
}
}
@@ -171,16 +191,33 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
// leaderSessionID is null if the leader contender has not yet confirmed the session ID
if (leaderLatch.hasLeadership()) {
synchronized (lock) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Leader node changed while {} is the leader with session ID {}.",
+ leaderContender.getAddress(),
+ confirmedLeaderSessionID);
+ }
+
if (confirmedLeaderSessionID != null) {
ChildData childData = cache.getCurrentData();
if (childData == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Writing leader information into empty node by {}.",
+ leaderContender.getAddress());
+ }
writeLeaderInformation(confirmedLeaderSessionID);
} else {
byte[] data = childData.getData();
if (data == null || data.length == 0) {
// the data field seems to be empty, rewrite information
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Writing leader information into node with empty data field by {}.",
+ leaderContender.getAddress());
+ }
writeLeaderInformation(confirmedLeaderSessionID);
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
@@ -192,6 +229,11 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
// the data field does not correspond to the expected leader information
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Correcting leader information by {}.",
+ leaderContender.getAddress());
+ }
writeLeaderInformation(confirmedLeaderSessionID);
}
}
@@ -214,6 +256,12 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
// this method does not have to be synchronized because the curator framework client
// is thread-safe
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Write leader information: Leader={}, session ID={}.",
+ leaderContender.getAddress(),
+ leaderSessionID);
+ }
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
@@ -258,6 +306,13 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
}
}
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Successfully wrote leader information: Leader={}, session ID={}.",
+ leaderContender.getAddress(),
+ leaderSessionID);
+ }
} catch (Exception e) {
leaderContender.handleError(
new Exception("Could not write leader address and leader session ID to " +
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index 20ed4d3..d17133a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -89,6 +89,8 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
@Override
public void nodeChanged() throws Exception {
try {
+ LOG.debug("Leader node has changed.");
+
ChildData childData = cache.getCurrentData();
String leaderAddress;
@@ -114,6 +116,11 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
if(!(Objects.equals(leaderAddress, lastLeaderAddress) &&
Objects.equals(leaderSessionID, lastLeaderSessionID))) {
+ LOG.debug(
+ "New leader information: Leader={}, session ID={}.",
+ leaderAddress,
+ leaderSessionID);
+
lastLeaderAddress = leaderAddress;
lastLeaderSessionID = leaderSessionID;
leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index e6067d0..753bbab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -25,7 +25,7 @@ import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
-import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
@@ -51,7 +51,7 @@ import java.util.concurrent.TimeUnit;
public class JobManagerLeaderElectionTest extends TestLogger {
private static ActorSystem actorSystem;
- private static TestingCluster testingCluster;
+ private static TestingServer testingServer;
private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);
@@ -59,8 +59,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
public static void setup() throws Exception {
actorSystem = ActorSystem.create("TestingActorSystem");
- testingCluster = new TestingCluster(3);
- testingCluster.start();
+ testingServer = new TestingServer();
}
@AfterClass
@@ -69,8 +68,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
JavaTestKit.shutdownActorSystem(actorSystem);
}
- if(testingCluster != null) {
- testingCluster.stop();
+ if(testingServer != null) {
+ testingServer.stop();
}
}
@@ -83,8 +82,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
configuration.setString(
- ConfigConstants.ZOOKEEPER_QUORUM_KEY,
- testingCluster.getConnectString());
+ ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+ testingServer.getConnectString());
ActorRef jm = null;
@@ -114,8 +113,8 @@ public class JobManagerLeaderElectionTest extends TestLogger {
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
configuration.setString(
- ConfigConstants.ZOOKEEPER_QUORUM_KEY,
- testingCluster.getConnectString());
+ ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+ testingServer.getConnectString());
ActorRef jm;
ActorRef jm2 = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java
index 9b2ab60..b03c165 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java
@@ -18,6 +18,9 @@
package org.apache.flink.runtime.leaderelection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -26,6 +29,7 @@ import java.util.concurrent.TimeoutException;
* purposes.
*/
public class TestingContender implements LeaderContender {
+ private static final Logger LOG = LoggerFactory.getLogger(TestingContender.class);
private final String address;
private final LeaderElectionService leaderElectionService;
@@ -110,6 +114,8 @@ public class TestingContender implements LeaderContender {
@Override
public void grantLeadership(UUID leaderSessionID) {
synchronized (lock) {
+ LOG.debug("Was granted leadership with session ID {}.", leaderSessionID);
+
this.leaderSessionID = leaderSessionID;
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
@@ -123,6 +129,8 @@ public class TestingContender implements LeaderContender {
@Override
public void revokeLeadership() {
synchronized (lock) {
+ LOG.debug("Was revoked leadership. Old session ID {}.", leaderSessionID);
+
leader = false;
leaderSessionID = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
index 4372414..7b3d06f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -28,6 +30,7 @@ import java.util.concurrent.TimeoutException;
* testing purposes.
*/
public class TestingListener implements LeaderRetrievalListener {
+ private static Logger LOG = LoggerFactory.getLogger(TestingListener.class);
private String address;
private String oldAddress;
@@ -101,6 +104,8 @@ public class TestingListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
synchronized (lock) {
+ LOG.debug("Notified about new leader address {} with session ID {}.", leaderAddress, leaderSessionID);
+
this.address = leaderAddress;
this.leaderSessionID = leaderSessionID;
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 16629b3..7c7867a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -24,7 +24,7 @@ import org.apache.curator.framework.api.ProtectACLCreateModePathAndBytesable;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
-import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
@@ -54,16 +54,14 @@ import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
public class ZooKeeperLeaderElectionTest extends TestLogger {
- private TestingCluster testingCluster;
+ private TestingServer testingServer;
private static final String TEST_URL = "akka//user/jobmanager";
- private static final FiniteDuration timeout = new FiniteDuration(60000, TimeUnit.MILLISECONDS);
+ private static final FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS);
@Before
public void before() {
- testingCluster = new TestingCluster(3);
-
try {
- testingCluster.start();
+ testingServer = new TestingServer();
} catch (Exception e) {
throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
}
@@ -72,12 +70,12 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@After
public void after() {
try {
- testingCluster.stop();
+ testingServer.stop();
} catch (Exception e) {
throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
}
- testingCluster = null;
+ testingServer = null;
}
/**
@@ -86,7 +84,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testZooKeeperLeaderElectionRetrieval() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -131,10 +129,10 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testZooKeeperReelection() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
- int num = 100;
+ int num = 50;
ZooKeeperLeaderElectionService[] leaderElectionService = new ZooKeeperLeaderElectionService[num];
TestingContender[] contenders = new TestingContender[num];
@@ -200,7 +198,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testZooKeeperReelectionWithReplacement() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
int num = 3;
@@ -281,7 +279,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
final String leaderPath = "/leader";
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
configuration.setString(ConfigConstants.ZOOKEEPER_LEADER_PATH, leaderPath);
@@ -359,7 +357,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testExceptionForwarding() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
ZooKeeperLeaderElectionService leaderElectionService = null;
@@ -428,7 +426,7 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
@Test
public void testEphemeralZooKeeperNodes() throws Exception {
Configuration configuration = new Configuration();
- configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
ZooKeeperLeaderElectionService leaderElectionService;
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 87161c6..bb60415 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.leaderelection;
-import org.apache.curator.test.TestingCluster;
+import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobmanager.JobManager;
@@ -45,13 +45,12 @@ import static org.junit.Assert.*;
public class ZooKeeperLeaderRetrievalTest extends TestLogger{
- private TestingCluster testingCluster;
+ private TestingServer testingServer;
@Before
public void before() {
- testingCluster = new TestingCluster(3);
try {
- testingCluster.start();
+ testingServer = new TestingServer();
} catch (Exception e) {
throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
}
@@ -59,13 +58,13 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
@After
public void after() {
- if(testingCluster != null) {
+ if(testingServer != null) {
try {
- testingCluster.stop();
+ testingServer.stop();
} catch (IOException e) {
throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
}
- testingCluster = null;
+ testingServer = null;
}
}
@@ -83,7 +82,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
long sleepingTime = 1000;
config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
- config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
LeaderElectionService leaderElectionService = null;
LeaderElectionService faultyLeaderElectionService;
@@ -168,7 +167,7 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
public void testTimeoutOfFindConnectingAddress() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
- config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingCluster.getConnectString());
+ config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/flink/blob/b5a3f55a/tools/log4j-travis.properties
----------------------------------------------------------------------
diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index d031ea5..1cdd152 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -37,7 +37,9 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m
# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
log4j.logger.org.apache.zookeeper=ERROR, file
-
+log4j.logger.org.apache.zookeeper.server.quorum.QuorumCnxManager=OFF, file
+log4j.logger.org.apache.flink.runtime.leaderelection=DEBUG,file
+log4j.logger.org.apache.flink.runtime.leaderretrieval=DEBUG,file
# Log a bit when running the flink-yarn-tests to avoid running into the 5 minutes timeout for
# the tests
log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console