You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2018/06/05 20:32:57 UTC
[fluo] branch master updated: Switch to leader latch WIP (#1037)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new 9b5277c Switch to leader latch WIP (#1037)
9b5277c is described below
commit 9b5277c9c79bb3a52bb879af591d1c51ae9476cc
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Tue Jun 5 16:32:44 2018 -0400
Switch to leader latch WIP (#1037)
These changes were made because the Curator LeaderSelector recipe
was generating a lot of spurious error messages. The LeaderLatch
recipe did not generate as many spurious errors.
See CURATOR-467 CURATOR-468 CURATOR-469
---
.../apache/fluo/core/impl/CuratorCnxnListener.java | 8 +-
.../org/apache/fluo/core/oracle/OracleClient.java | 18 +--
.../org/apache/fluo/core/oracle/OracleServer.java | 161 ++++++++++++---------
.../org/apache/fluo/core/util/CuratorUtil.java | 5 +-
.../org/apache/fluo/integration/impl/OracleIT.java | 38 ++---
5 files changed, 125 insertions(+), 105 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java b/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
index 5711e86..d9de6a2 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -21,9 +21,9 @@ import org.apache.curator.framework.state.ConnectionStateListener;
public class CuratorCnxnListener implements ConnectionStateListener {
- private boolean isConnected;
+ private volatile boolean isConnected;
- public synchronized boolean isConnected() {
+ public boolean isConnected() {
return isConnected;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index df80f8d..205634b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -31,7 +31,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.fluo.accumulo.util.ZookeeperPath;
@@ -77,7 +77,7 @@ public class OracleClient implements AutoCloseable {
private class TimestampRetriever extends LeaderSelectorListenerAdapter
implements Runnable, PathChildrenCacheListener {
- private LeaderSelector leaderSelector;
+ private LeaderLatch leaderLatch;
private CuratorFramework curatorFramework;
private OracleService.Client client;
private PathChildrenCache pathChildrenCache;
@@ -104,7 +104,7 @@ public class OracleClient implements AutoCloseable {
Thread.sleep(200);
}
- leaderSelector = new LeaderSelector(curatorFramework, ZookeeperPath.ORACLE_SERVER, this);
+ leaderLatch = new LeaderLatch(curatorFramework, ZookeeperPath.ORACLE_SERVER);
pathChildrenCache =
new PathChildrenCache(curatorFramework, ZookeeperPath.ORACLE_SERVER, true);
@@ -135,10 +135,10 @@ public class OracleClient implements AutoCloseable {
|| event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)
|| event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
- Participant participant = leaderSelector.getLeader();
+ Participant participant = leaderLatch.getLeader();
synchronized (this) {
if (isLeader(participant)) {
- currentLeader = leaderSelector.getLeader();
+ currentLeader = leaderLatch.getLeader();
} else {
currentLeader = null;
}
@@ -280,14 +280,14 @@ public class OracleClient implements AutoCloseable {
transport = null;
pathChildrenCache = null;
- leaderSelector = null;
+ leaderLatch = null;
curatorFramework = null;
}
private boolean getLeaderAttempt() {
Participant possibleLeader = null;
try {
- possibleLeader = leaderSelector.getLeader();
+ possibleLeader = leaderLatch.getLeader();
} catch (KeeperException e) {
log.debug("Exception throw in getLeaderAttempt()", e);
} catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
index 0c7ddf0..0c35240 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -20,17 +20,21 @@ import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Histogram;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.accumulo.util.ZookeeperPath;
@@ -41,6 +45,7 @@ import org.apache.fluo.core.metrics.MetricsUtil;
import org.apache.fluo.core.thrift.OracleService;
import org.apache.fluo.core.thrift.Stamps;
import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.fluo.core.util.FluoThreadFactory;
import org.apache.fluo.core.util.Halt;
import org.apache.fluo.core.util.HostUtil;
import org.apache.fluo.core.util.PortUtils;
@@ -63,15 +68,14 @@ import org.slf4j.LoggerFactory;
* Oracle server is the responsible for providing incrementing logical timestamps to clients. It
* should never give the same timestamp to two clients and it should always provide an incrementing
* timestamp.
- *
+ *
* <p>
* If multiple oracle servers are run, they will choose a leader and clients will automatically
* connect to that leader. If the leader goes down, the client will automatically fail over to the
* next leader. In the case where an oracle fails over, the next oracle will begin a new block of
* timestamps.
*/
-public class OracleServer extends LeaderSelectorListenerAdapter
- implements OracleService.Iface, PathChildrenCacheListener {
+public class OracleServer implements OracleService.Iface, PathChildrenCacheListener {
private static final Logger log = LoggerFactory.getLogger(OracleServer.class);
@@ -88,7 +92,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter
private volatile boolean started = false;
private int port = 0;
- private LeaderSelector leaderSelector;
+ private LeaderLatch leaderLatch;
+ private ExecutorService execService;
private PathChildrenCache pathChildrenCache;
private CuratorFramework curatorFramework;
private CuratorCnxnListener cnxnListener;
@@ -253,7 +258,7 @@ public class OracleServer extends LeaderSelectorListenerAdapter
}
@Override
- public boolean isLeader() throws TException {
+ public boolean isLeader() {
return isLeader;
}
@@ -272,6 +277,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter
}
private InetSocketAddress startServer() throws TTransportException {
+ Preconditions.checkState(
+ curatorFramework != null && curatorFramework.getState() == CuratorFrameworkState.STARTED);
if (env.getConfiguration().containsKey(FluoConfigurationImpl.ORACLE_PORT_PROP)) {
port = env.getConfiguration().getInt(FluoConfigurationImpl.ORACLE_PORT_PROP);
@@ -312,8 +319,6 @@ public class OracleServer extends LeaderSelectorListenerAdapter
throw new IllegalStateException();
}
- final InetSocketAddress addr = startServer();
-
curatorFramework = CuratorUtil.newAppCurator(env.getConfiguration());
curatorFramework.getConnectionStateListenable().addListener(cnxnListener);
curatorFramework.start();
@@ -322,11 +327,29 @@ public class OracleServer extends LeaderSelectorListenerAdapter
Thread.sleep(200);
}
- leaderSelector = new LeaderSelector(curatorFramework, ZookeeperPath.ORACLE_SERVER, this);
+ final InetSocketAddress addr = startServer();
+
String leaderId = HostUtil.getHostName() + ":" + addr.getPort();
- leaderSelector.setId(leaderId);
+ leaderLatch = new LeaderLatch(curatorFramework, ZookeeperPath.ORACLE_SERVER, leaderId);
log.info("Leader ID = " + leaderId);
- leaderSelector.start();
+ execService = Executors.newSingleThreadExecutor(new FluoThreadFactory("Oracle Server Worker"));
+ leaderLatch.addListener(new LeaderLatchListener() {
+ @Override
+ public void notLeader() {
+ isLeader = false;
+
+ if (started) {
+ // if we stopped the server manually, we shouldn't halt
+ Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
+ }
+ }
+
+ @Override
+ public void isLeader() {
+ assumeLeadership();
+ }
+ }, execService);
+ leaderLatch.start();
pathChildrenCache = new PathChildrenCache(curatorFramework, oraclePath, true);
pathChildrenCache.getListenable().addListener(this);
@@ -341,9 +364,49 @@ public class OracleServer extends LeaderSelectorListenerAdapter
started = true;
}
+ private void assumeLeadership() {
+ Preconditions.checkState(!isLeader);
+
+ // sanity check- make sure previous oracle is no longer listening for connections
+ if (currentLeader != null) {
+ String[] address = currentLeader.getId().split(":");
+ String host = address[0];
+ int port = Integer.parseInt(address[1]);
+
+ OracleService.Client client = getOracleClient(host, port);
+ if (client != null) {
+ try {
+ while (client.isLeader()) {
+ Thread.sleep(500);
+ }
+ } catch (Exception e) {
+ log.debug("Exception thrown in takeLeadership()", e);
+ }
+ }
+ }
+
+ try {
+ synchronized (this) {
+ byte[] d = curatorFramework.getData().forPath(maxTsPath);
+ currentTs = maxTs = LongUtil.fromByteArray(d);
+ }
+
+ gcTsTracker = new GcTimestampTracker();
+ gcTsTracker.start();
+
+ isLeader = true;
+ log.info("Assumed leadership " + leaderLatch.getId());
+ } catch (Exception e) {
+ log.warn("Failed to become leader ", e);
+ }
+ }
+
public synchronized void stop() throws Exception {
if (started) {
+
+ isLeader = false;
+
server.stop();
serverThread.join();
@@ -357,8 +420,17 @@ public class OracleServer extends LeaderSelectorListenerAdapter
if (curatorFramework.getState().equals(CuratorFrameworkState.STARTED)) {
pathChildrenCache.getListenable().removeListener(this);
pathChildrenCache.close();
- leaderSelector.close();
- curatorFramework.getConnectionStateListenable().removeListener(this);
+ leaderLatch.close();
+
+ execService.shutdown();
+
+ execService.awaitTermination(10, TimeUnit.SECONDS);
+
+ curatorFramework.getConnectionStateListenable().removeListener(cnxnListener);
+
+ // leaderLatch.close() schedules a background delete, give it a chance to process before
+ // closing curator... this is done to avoid spurious exceptions, see CURATOR-467
+ Uninterruptibles.sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
curatorFramework.close();
}
log.info("Oracle server has been stopped.");
@@ -381,58 +453,6 @@ public class OracleServer extends LeaderSelectorListenerAdapter
return null;
}
- /**
- * Upon an oracle being elected the leader, it will need to adjust its starting timestamp to the
- * last timestamp set in zookeeper.
- *
- * @param curatorFramework Curator framework
- */
- @Override
- public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
-
- try {
- // sanity check- make sure previous oracle is no longer listening for connections
- if (currentLeader != null) {
- String[] address = currentLeader.getId().split(":");
- String host = address[0];
- int port = Integer.parseInt(address[1]);
-
- OracleService.Client client = getOracleClient(host, port);
- if (client != null) {
- try {
- while (client.isLeader()) {
- Thread.sleep(500);
- }
- } catch (Exception e) {
- log.debug("Exception thrown in takeLeadership()", e);
- }
- }
- }
-
- synchronized (this) {
- byte[] d = curatorFramework.getData().forPath(maxTsPath);
- currentTs = maxTs = LongUtil.fromByteArray(d);
- }
-
- gcTsTracker = new GcTimestampTracker();
- gcTsTracker.start();
-
- isLeader = true;
-
- while (started) {
- // if leadership is lost, then curator will interrupt the thread that called this method
- Thread.sleep(100);
- }
- } finally {
- isLeader = false;
-
- if (started) {
- // if we stopped the server manually, we shouldn't halt
- Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
- }
- }
- }
-
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
throws Exception {
@@ -442,8 +462,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter
|| event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)
|| event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))) {
synchronized (this) {
- Participant participant = leaderSelector.getLeader();
- if (isLeader(participant) && !leaderSelector.hasLeadership()) {
+ Participant participant = leaderLatch.getLeader();
+ if (isLeader(participant) && !leaderLatch.hasLeadership()) {
// in case current instance becomes leader, we want to know who came before it.
currentLeader = participant;
}
@@ -453,5 +473,4 @@ public class OracleServer extends LeaderSelectorListenerAdapter
log.warn("Oracle leadership watcher has been interrupted unexpectedly");
}
}
-
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
index 7e68999..1aa2d94 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -89,7 +89,6 @@ public class CuratorUtil {
public static CuratorFramework newCurator(String zookeepers, int timeout, String secret) {
final ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
-
if (secret.isEmpty()) {
return CuratorFrameworkFactory.newClient(zookeepers, timeout, timeout, retry);
} else {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
index ccd9e73..8c895ee 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
@@ -26,6 +26,7 @@ import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Supplier;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.oracle.OracleClient;
@@ -180,7 +181,7 @@ public class OracleIT extends ITBaseImpl {
@Test
public void failover_newTimestampRequested() throws Exception {
- sleepUntilConnected(oserver);
+ sleepUntil(oserver::isConnected);
int port2 = PortUtils.getRandomFreePort();
int port3 = PortUtils.getRandomFreePort();
@@ -189,10 +190,10 @@ public class OracleIT extends ITBaseImpl {
TestOracle oserver3 = createExtraOracle(port3);
oserver2.start();
- sleepUntilConnected(oserver2);
+ sleepUntil(oserver2::isConnected);
oserver3.start();
- sleepUntilConnected(oserver3);
+ sleepUntil(oserver3::isConnected);
OracleClient client = env.getSharedResources().getOracleClient();
@@ -205,15 +206,18 @@ public class OracleIT extends ITBaseImpl {
assertTrue(client.getOracle().endsWith(Integer.toString(oserver.getPort())));
oserver.stop();
- sleepWhileConnected(oserver);
+ sleepWhile(oserver::isConnected);
+ sleepUntil(oserver2::isLeader);
assertEquals(1002, client.getStamp().getTxTimestamp());
assertTrue(client.getOracle().endsWith(Integer.toString(port2)));
oserver2.stop();
- sleepWhileConnected(oserver2);
+ sleepWhile(oserver2::isConnected);
oserver2.close();
+ sleepUntil(oserver3::isLeader);
+
assertEquals(2002, client.getStamp().getTxTimestamp());
assertTrue(client.getOracle().endsWith(Integer.toString(port3)));
@@ -228,7 +232,7 @@ public class OracleIT extends ITBaseImpl {
@Test
public void singleOracle_goesAwayAndComesBack() throws Exception {
- sleepUntilConnected(oserver);
+ sleepUntil(oserver::isConnected);
OracleClient client = env.getSharedResources().getOracleClient();
@@ -239,7 +243,7 @@ public class OracleIT extends ITBaseImpl {
}
oserver.stop();
- sleepWhileConnected(oserver);
+ sleepWhile(oserver::isConnected);
while (client.getOracle() != null) {
Thread.sleep(100);
@@ -248,7 +252,7 @@ public class OracleIT extends ITBaseImpl {
assertNull(client.getOracle());
oserver.start();
- sleepUntilConnected(oserver);
+ sleepUntil(oserver::isConnected);
assertEquals(1002, client.getStamp().getTxTimestamp());
@@ -273,12 +277,12 @@ public class OracleIT extends ITBaseImpl {
TestOracle oserver2 = createExtraOracle(port2);
oserver2.start();
- sleepUntilConnected(oserver2);
+ sleepUntil(oserver2::isConnected);
TestOracle oserver3 = createExtraOracle(port3);
oserver3.start();
- sleepUntilConnected(oserver3);
+ sleepUntil(oserver3::isConnected);
for (int i = 0; i < numThreads; i++) {
tpool.execute(new TimestampFetcher(numTimes, env, output, cdl));
@@ -318,14 +322,12 @@ public class OracleIT extends ITBaseImpl {
oserver3.close();
}
- private void sleepWhileConnected(OracleServer oserver) throws InterruptedException {
- while (oserver.isConnected()) {
- Thread.sleep(100);
- }
+ private void sleepUntil(Supplier<Boolean> condition) throws InterruptedException {
+ sleepWhile(() -> !condition.get());
}
- private void sleepUntilConnected(OracleServer oserver) throws InterruptedException {
- while (!oserver.isConnected()) {
+ private void sleepWhile(Supplier<Boolean> condition) throws InterruptedException {
+ while (condition.get()) {
Thread.sleep(100);
}
}
--
To stop receiving notification emails like this one, please contact
kturner@apache.org.