You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by kt...@apache.org on 2018/06/05 20:32:57 UTC

[fluo] branch master updated: Switch to leader latch WIP (#1037)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b5277c  Switch to leader latch WIP (#1037)
9b5277c is described below

commit 9b5277c9c79bb3a52bb879af591d1c51ae9476cc
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Tue Jun 5 16:32:44 2018 -0400

    Switch to leader latch WIP (#1037)
    
    These changes were made because the Curator LeaderSelector recipe
    was generating a lot of spurious error messages.  The LeaderLatch
    recipe did not generate as many spurious errors.
    
    See CURATOR-467 CURATOR-468 CURATOR-469
---
 .../apache/fluo/core/impl/CuratorCnxnListener.java |   8 +-
 .../org/apache/fluo/core/oracle/OracleClient.java  |  18 +--
 .../org/apache/fluo/core/oracle/OracleServer.java  | 161 ++++++++++++---------
 .../org/apache/fluo/core/util/CuratorUtil.java     |   5 +-
 .../org/apache/fluo/integration/impl/OracleIT.java |  38 ++---
 5 files changed, 125 insertions(+), 105 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java b/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
index 5711e86..d9de6a2 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -21,9 +21,9 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 
 public class CuratorCnxnListener implements ConnectionStateListener {
 
-  private boolean isConnected;
+  private volatile boolean isConnected;
 
-  public synchronized boolean isConnected() {
+  public boolean isConnected() {
     return isConnected;
   }
 
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index df80f8d..205634b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -31,7 +31,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
 import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
@@ -77,7 +77,7 @@ public class OracleClient implements AutoCloseable {
   private class TimestampRetriever extends LeaderSelectorListenerAdapter
       implements Runnable, PathChildrenCacheListener {
 
-    private LeaderSelector leaderSelector;
+    private LeaderLatch leaderLatch;
     private CuratorFramework curatorFramework;
     private OracleService.Client client;
     private PathChildrenCache pathChildrenCache;
@@ -104,7 +104,7 @@ public class OracleClient implements AutoCloseable {
             Thread.sleep(200);
           }
 
-          leaderSelector = new LeaderSelector(curatorFramework, ZookeeperPath.ORACLE_SERVER, this);
+          leaderLatch = new LeaderLatch(curatorFramework, ZookeeperPath.ORACLE_SERVER);
 
           pathChildrenCache =
               new PathChildrenCache(curatorFramework, ZookeeperPath.ORACLE_SERVER, true);
@@ -135,10 +135,10 @@ public class OracleClient implements AutoCloseable {
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
 
-        Participant participant = leaderSelector.getLeader();
+        Participant participant = leaderLatch.getLeader();
         synchronized (this) {
           if (isLeader(participant)) {
-            currentLeader = leaderSelector.getLeader();
+            currentLeader = leaderLatch.getLeader();
           } else {
             currentLeader = null;
           }
@@ -280,14 +280,14 @@ public class OracleClient implements AutoCloseable {
 
       transport = null;
       pathChildrenCache = null;
-      leaderSelector = null;
+      leaderLatch = null;
       curatorFramework = null;
     }
 
     private boolean getLeaderAttempt() {
       Participant possibleLeader = null;
       try {
-        possibleLeader = leaderSelector.getLeader();
+        possibleLeader = leaderLatch.getLeader();
       } catch (KeeperException e) {
         log.debug("Exception throw in getLeaderAttempt()", e);
       } catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
index 0c7ddf0..0c35240 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -20,17 +20,21 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.Histogram;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.fluo.accumulo.util.LongUtil;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
@@ -41,6 +45,7 @@ import org.apache.fluo.core.metrics.MetricsUtil;
 import org.apache.fluo.core.thrift.OracleService;
 import org.apache.fluo.core.thrift.Stamps;
 import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.fluo.core.util.FluoThreadFactory;
 import org.apache.fluo.core.util.Halt;
 import org.apache.fluo.core.util.HostUtil;
 import org.apache.fluo.core.util.PortUtils;
@@ -63,15 +68,14 @@ import org.slf4j.LoggerFactory;
  * Oracle server is the responsible for providing incrementing logical timestamps to clients. It
  * should never give the same timestamp to two clients and it should always provide an incrementing
  * timestamp.
- * 
+ *
  * <p>
  * If multiple oracle servers are run, they will choose a leader and clients will automatically
  * connect to that leader. If the leader goes down, the client will automatically fail over to the
  * next leader. In the case where an oracle fails over, the next oracle will begin a new block of
  * timestamps.
  */
-public class OracleServer extends LeaderSelectorListenerAdapter
-    implements OracleService.Iface, PathChildrenCacheListener {
+public class OracleServer implements OracleService.Iface, PathChildrenCacheListener {
 
   private static final Logger log = LoggerFactory.getLogger(OracleServer.class);
 
@@ -88,7 +92,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter
   private volatile boolean started = false;
   private int port = 0;
 
-  private LeaderSelector leaderSelector;
+  private LeaderLatch leaderLatch;
+  private ExecutorService execService;
   private PathChildrenCache pathChildrenCache;
   private CuratorFramework curatorFramework;
   private CuratorCnxnListener cnxnListener;
@@ -253,7 +258,7 @@ public class OracleServer extends LeaderSelectorListenerAdapter
   }
 
   @Override
-  public boolean isLeader() throws TException {
+  public boolean isLeader() {
     return isLeader;
   }
 
@@ -272,6 +277,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter
   }
 
   private InetSocketAddress startServer() throws TTransportException {
+    Preconditions.checkState(
+        curatorFramework != null && curatorFramework.getState() == CuratorFrameworkState.STARTED);
 
     if (env.getConfiguration().containsKey(FluoConfigurationImpl.ORACLE_PORT_PROP)) {
       port = env.getConfiguration().getInt(FluoConfigurationImpl.ORACLE_PORT_PROP);
@@ -312,8 +319,6 @@ public class OracleServer extends LeaderSelectorListenerAdapter
       throw new IllegalStateException();
     }
 
-    final InetSocketAddress addr = startServer();
-
     curatorFramework = CuratorUtil.newAppCurator(env.getConfiguration());
     curatorFramework.getConnectionStateListenable().addListener(cnxnListener);
     curatorFramework.start();
@@ -322,11 +327,29 @@ public class OracleServer extends LeaderSelectorListenerAdapter
       Thread.sleep(200);
     }
 
-    leaderSelector = new LeaderSelector(curatorFramework, ZookeeperPath.ORACLE_SERVER, this);
+    final InetSocketAddress addr = startServer();
+
     String leaderId = HostUtil.getHostName() + ":" + addr.getPort();
-    leaderSelector.setId(leaderId);
+    leaderLatch = new LeaderLatch(curatorFramework, ZookeeperPath.ORACLE_SERVER, leaderId);
     log.info("Leader ID = " + leaderId);
-    leaderSelector.start();
+    execService = Executors.newSingleThreadExecutor(new FluoThreadFactory("Oracle Server Worker"));
+    leaderLatch.addListener(new LeaderLatchListener() {
+      @Override
+      public void notLeader() {
+        isLeader = false;
+
+        if (started) {
+          // if we stopped the server manually, we shouldn't halt
+          Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
+        }
+      }
+
+      @Override
+      public void isLeader() {
+        assumeLeadership();
+      }
+    }, execService);
+    leaderLatch.start();
 
     pathChildrenCache = new PathChildrenCache(curatorFramework, oraclePath, true);
     pathChildrenCache.getListenable().addListener(this);
@@ -341,9 +364,49 @@ public class OracleServer extends LeaderSelectorListenerAdapter
     started = true;
   }
 
+  private void assumeLeadership() {
+    Preconditions.checkState(!isLeader);
+
+    // sanity check- make sure previous oracle is no longer listening for connections
+    if (currentLeader != null) {
+      String[] address = currentLeader.getId().split(":");
+      String host = address[0];
+      int port = Integer.parseInt(address[1]);
+
+      OracleService.Client client = getOracleClient(host, port);
+      if (client != null) {
+        try {
+          while (client.isLeader()) {
+            Thread.sleep(500);
+          }
+        } catch (Exception e) {
+          log.debug("Exception thrown in takeLeadership()", e);
+        }
+      }
+    }
+
+    try {
+      synchronized (this) {
+        byte[] d = curatorFramework.getData().forPath(maxTsPath);
+        currentTs = maxTs = LongUtil.fromByteArray(d);
+      }
+
+      gcTsTracker = new GcTimestampTracker();
+      gcTsTracker.start();
+
+      isLeader = true;
+      log.info("Assumed leadership " + leaderLatch.getId());
+    } catch (Exception e) {
+      log.warn("Failed to become leader ", e);
+    }
+  }
+
   public synchronized void stop() throws Exception {
     if (started) {
 
+
+      isLeader = false;
+
       server.stop();
       serverThread.join();
 
@@ -357,8 +420,17 @@ public class OracleServer extends LeaderSelectorListenerAdapter
       if (curatorFramework.getState().equals(CuratorFrameworkState.STARTED)) {
         pathChildrenCache.getListenable().removeListener(this);
         pathChildrenCache.close();
-        leaderSelector.close();
-        curatorFramework.getConnectionStateListenable().removeListener(this);
+        leaderLatch.close();
+
+        execService.shutdown();
+
+        execService.awaitTermination(10, TimeUnit.SECONDS);
+
+        curatorFramework.getConnectionStateListenable().removeListener(cnxnListener);
+
+        // leaderLatch.close() schedules a background delete, give it a chance to process before
+        // closing curator... this is done to avoid spurious exceptions, see CURATOR-467
+        Uninterruptibles.sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
         curatorFramework.close();
       }
       log.info("Oracle server has been stopped.");
@@ -381,58 +453,6 @@ public class OracleServer extends LeaderSelectorListenerAdapter
     return null;
   }
 
-  /**
-   * Upon an oracle being elected the leader, it will need to adjust its starting timestamp to the
-   * last timestamp set in zookeeper.
-   *
-   * @param curatorFramework Curator framework
-   */
-  @Override
-  public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
-
-    try {
-      // sanity check- make sure previous oracle is no longer listening for connections
-      if (currentLeader != null) {
-        String[] address = currentLeader.getId().split(":");
-        String host = address[0];
-        int port = Integer.parseInt(address[1]);
-
-        OracleService.Client client = getOracleClient(host, port);
-        if (client != null) {
-          try {
-            while (client.isLeader()) {
-              Thread.sleep(500);
-            }
-          } catch (Exception e) {
-            log.debug("Exception thrown in takeLeadership()", e);
-          }
-        }
-      }
-
-      synchronized (this) {
-        byte[] d = curatorFramework.getData().forPath(maxTsPath);
-        currentTs = maxTs = LongUtil.fromByteArray(d);
-      }
-
-      gcTsTracker = new GcTimestampTracker();
-      gcTsTracker.start();
-
-      isLeader = true;
-
-      while (started) {
-        // if leadership is lost, then curator will interrupt the thread that called this method
-        Thread.sleep(100);
-      }
-    } finally {
-      isLeader = false;
-
-      if (started) {
-        // if we stopped the server manually, we shouldn't halt
-        Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
-      }
-    }
-  }
-
   @Override
   public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
       throws Exception {
@@ -442,8 +462,8 @@ public class OracleServer extends LeaderSelectorListenerAdapter
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))) {
         synchronized (this) {
-          Participant participant = leaderSelector.getLeader();
-          if (isLeader(participant) && !leaderSelector.hasLeadership()) {
+          Participant participant = leaderLatch.getLeader();
+          if (isLeader(participant) && !leaderLatch.hasLeadership()) {
             // in case current instance becomes leader, we want to know who came before it.
             currentLeader = participant;
           }
@@ -453,5 +473,4 @@ public class OracleServer extends LeaderSelectorListenerAdapter
       log.warn("Oracle leadership watcher has been interrupted unexpectedly");
     }
   }
-
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
index 7e68999..1aa2d94 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -89,7 +89,6 @@ public class CuratorUtil {
   public static CuratorFramework newCurator(String zookeepers, int timeout, String secret) {
 
     final ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
-
     if (secret.isEmpty()) {
       return CuratorFrameworkFactory.newClient(zookeepers, timeout, timeout, retry);
     } else {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
index ccd9e73..8c895ee 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -26,6 +26,7 @@ import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Supplier;
 
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.oracle.OracleClient;
@@ -180,7 +181,7 @@ public class OracleIT extends ITBaseImpl {
   @Test
   public void failover_newTimestampRequested() throws Exception {
 
-    sleepUntilConnected(oserver);
+    sleepUntil(oserver::isConnected);
 
     int port2 = PortUtils.getRandomFreePort();
     int port3 = PortUtils.getRandomFreePort();
@@ -189,10 +190,10 @@ public class OracleIT extends ITBaseImpl {
     TestOracle oserver3 = createExtraOracle(port3);
 
     oserver2.start();
-    sleepUntilConnected(oserver2);
+    sleepUntil(oserver2::isConnected);
 
     oserver3.start();
-    sleepUntilConnected(oserver3);
+    sleepUntil(oserver3::isConnected);
 
     OracleClient client = env.getSharedResources().getOracleClient();
 
@@ -205,15 +206,18 @@ public class OracleIT extends ITBaseImpl {
     assertTrue(client.getOracle().endsWith(Integer.toString(oserver.getPort())));
 
     oserver.stop();
-    sleepWhileConnected(oserver);
+    sleepWhile(oserver::isConnected);
+    sleepUntil(oserver2::isLeader);
 
     assertEquals(1002, client.getStamp().getTxTimestamp());
     assertTrue(client.getOracle().endsWith(Integer.toString(port2)));
 
     oserver2.stop();
-    sleepWhileConnected(oserver2);
+    sleepWhile(oserver2::isConnected);
     oserver2.close();
 
+    sleepUntil(oserver3::isLeader);
+
     assertEquals(2002, client.getStamp().getTxTimestamp());
     assertTrue(client.getOracle().endsWith(Integer.toString(port3)));
 
@@ -228,7 +232,7 @@ public class OracleIT extends ITBaseImpl {
   @Test
   public void singleOracle_goesAwayAndComesBack() throws Exception {
 
-    sleepUntilConnected(oserver);
+    sleepUntil(oserver::isConnected);
 
     OracleClient client = env.getSharedResources().getOracleClient();
 
@@ -239,7 +243,7 @@ public class OracleIT extends ITBaseImpl {
     }
 
     oserver.stop();
-    sleepWhileConnected(oserver);
+    sleepWhile(oserver::isConnected);
 
     while (client.getOracle() != null) {
       Thread.sleep(100);
@@ -248,7 +252,7 @@ public class OracleIT extends ITBaseImpl {
     assertNull(client.getOracle());
 
     oserver.start();
-    sleepUntilConnected(oserver);
+    sleepUntil(oserver::isConnected);
 
     assertEquals(1002, client.getStamp().getTxTimestamp());
 
@@ -273,12 +277,12 @@ public class OracleIT extends ITBaseImpl {
     TestOracle oserver2 = createExtraOracle(port2);
 
     oserver2.start();
-    sleepUntilConnected(oserver2);
+    sleepUntil(oserver2::isConnected);
 
     TestOracle oserver3 = createExtraOracle(port3);
 
     oserver3.start();
-    sleepUntilConnected(oserver3);
+    sleepUntil(oserver3::isConnected);
 
     for (int i = 0; i < numThreads; i++) {
       tpool.execute(new TimestampFetcher(numTimes, env, output, cdl));
@@ -318,14 +322,12 @@ public class OracleIT extends ITBaseImpl {
     oserver3.close();
   }
 
-  private void sleepWhileConnected(OracleServer oserver) throws InterruptedException {
-    while (oserver.isConnected()) {
-      Thread.sleep(100);
-    }
+  private void sleepUntil(Supplier<Boolean> condition) throws InterruptedException {
+    sleepWhile(() -> !condition.get());
   }
 
-  private void sleepUntilConnected(OracleServer oserver) throws InterruptedException {
-    while (!oserver.isConnected()) {
+  private void sleepWhile(Supplier<Boolean> condition) throws InterruptedException {
+    while (condition.get()) {
       Thread.sleep(100);
     }
   }

-- 
To stop receiving notification emails like this one, please contact
kturner@apache.org.