You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@fluo.apache.org by GitBox <gi...@apache.org> on 2018/06/05 20:32:46 UTC

[GitHub] keith-turner closed pull request #1037: Switch to leader latch WIP

keith-turner closed pull request #1037: Switch to leader latch WIP
URL: https://github.com/apache/fluo/pull/1037
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java b/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
index 5711e86c..d9de6a20 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/CuratorCnxnListener.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -21,9 +21,9 @@
 
 public class CuratorCnxnListener implements ConnectionStateListener {
 
-  private boolean isConnected;
+  private volatile boolean isConnected;
 
-  public synchronized boolean isConnected() {
+  public boolean isConnected() {
     return isConnected;
   }
 
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index df80f8d1..205634bc 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -31,7 +31,7 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
 import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
@@ -77,7 +77,7 @@
   private class TimestampRetriever extends LeaderSelectorListenerAdapter
       implements Runnable, PathChildrenCacheListener {
 
-    private LeaderSelector leaderSelector;
+    private LeaderLatch leaderLatch;
     private CuratorFramework curatorFramework;
     private OracleService.Client client;
     private PathChildrenCache pathChildrenCache;
@@ -104,7 +104,7 @@ public void run() {
             Thread.sleep(200);
           }
 
-          leaderSelector = new LeaderSelector(curatorFramework, ZookeeperPath.ORACLE_SERVER, this);
+          leaderLatch = new LeaderLatch(curatorFramework, ZookeeperPath.ORACLE_SERVER);
 
           pathChildrenCache =
               new PathChildrenCache(curatorFramework, ZookeeperPath.ORACLE_SERVER, true);
@@ -135,10 +135,10 @@ public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
 
-        Participant participant = leaderSelector.getLeader();
+        Participant participant = leaderLatch.getLeader();
         synchronized (this) {
           if (isLeader(participant)) {
-            currentLeader = leaderSelector.getLeader();
+            currentLeader = leaderLatch.getLeader();
           } else {
             currentLeader = null;
           }
@@ -280,14 +280,14 @@ private synchronized void close() {
 
       transport = null;
       pathChildrenCache = null;
-      leaderSelector = null;
+      leaderLatch = null;
       curatorFramework = null;
     }
 
     private boolean getLeaderAttempt() {
       Participant possibleLeader = null;
       try {
-        possibleLeader = leaderSelector.getLeader();
+        possibleLeader = leaderLatch.getLeader();
       } catch (KeeperException e) {
         log.debug("Exception throw in getLeaderAttempt()", e);
       } catch (Exception e) {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
index 0c7ddf07..0c35240f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -20,17 +20,21 @@
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.Histogram;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-import org.apache.curator.framework.recipes.leader.LeaderSelector;
-import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.fluo.accumulo.util.LongUtil;
 import org.apache.fluo.accumulo.util.ZookeeperPath;
@@ -41,6 +45,7 @@
 import org.apache.fluo.core.thrift.OracleService;
 import org.apache.fluo.core.thrift.Stamps;
 import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.fluo.core.util.FluoThreadFactory;
 import org.apache.fluo.core.util.Halt;
 import org.apache.fluo.core.util.HostUtil;
 import org.apache.fluo.core.util.PortUtils;
@@ -63,15 +68,14 @@
  * Oracle server is the responsible for providing incrementing logical timestamps to clients. It
  * should never give the same timestamp to two clients and it should always provide an incrementing
  * timestamp.
- * 
+ *
  * <p>
  * If multiple oracle servers are run, they will choose a leader and clients will automatically
  * connect to that leader. If the leader goes down, the client will automatically fail over to the
  * next leader. In the case where an oracle fails over, the next oracle will begin a new block of
  * timestamps.
  */
-public class OracleServer extends LeaderSelectorListenerAdapter
-    implements OracleService.Iface, PathChildrenCacheListener {
+public class OracleServer implements OracleService.Iface, PathChildrenCacheListener {
 
   private static final Logger log = LoggerFactory.getLogger(OracleServer.class);
 
@@ -88,7 +92,8 @@
   private volatile boolean started = false;
   private int port = 0;
 
-  private LeaderSelector leaderSelector;
+  private LeaderLatch leaderLatch;
+  private ExecutorService execService;
   private PathChildrenCache pathChildrenCache;
   private CuratorFramework curatorFramework;
   private CuratorCnxnListener cnxnListener;
@@ -253,7 +258,7 @@ private synchronized long getTimestampsImpl(String id, int num) throws TExceptio
   }
 
   @Override
-  public boolean isLeader() throws TException {
+  public boolean isLeader() {
     return isLeader;
   }
 
@@ -272,6 +277,8 @@ public boolean isConnected() {
   }
 
   private InetSocketAddress startServer() throws TTransportException {
+    Preconditions.checkState(
+        curatorFramework != null && curatorFramework.getState() == CuratorFrameworkState.STARTED);
 
     if (env.getConfiguration().containsKey(FluoConfigurationImpl.ORACLE_PORT_PROP)) {
       port = env.getConfiguration().getInt(FluoConfigurationImpl.ORACLE_PORT_PROP);
@@ -312,8 +319,6 @@ public synchronized void start() throws Exception {
       throw new IllegalStateException();
     }
 
-    final InetSocketAddress addr = startServer();
-
     curatorFramework = CuratorUtil.newAppCurator(env.getConfiguration());
     curatorFramework.getConnectionStateListenable().addListener(cnxnListener);
     curatorFramework.start();
@@ -322,11 +327,29 @@ public synchronized void start() throws Exception {
       Thread.sleep(200);
     }
 
-    leaderSelector = new LeaderSelector(curatorFramework, ZookeeperPath.ORACLE_SERVER, this);
+    final InetSocketAddress addr = startServer();
+
     String leaderId = HostUtil.getHostName() + ":" + addr.getPort();
-    leaderSelector.setId(leaderId);
+    leaderLatch = new LeaderLatch(curatorFramework, ZookeeperPath.ORACLE_SERVER, leaderId);
     log.info("Leader ID = " + leaderId);
-    leaderSelector.start();
+    execService = Executors.newSingleThreadExecutor(new FluoThreadFactory("Oracle Server Worker"));
+    leaderLatch.addListener(new LeaderLatchListener() {
+      @Override
+      public void notLeader() {
+        isLeader = false;
+
+        if (started) {
+          // if we stopped the server manually, we shouldn't halt
+          Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
+        }
+      }
+
+      @Override
+      public void isLeader() {
+        assumeLeadership();
+      }
+    }, execService);
+    leaderLatch.start();
 
     pathChildrenCache = new PathChildrenCache(curatorFramework, oraclePath, true);
     pathChildrenCache.getListenable().addListener(this);
@@ -341,9 +364,49 @@ public synchronized void start() throws Exception {
     started = true;
   }
 
+  private void assumeLeadership() {
+    Preconditions.checkState(!isLeader);
+
+    // sanity check- make sure previous oracle is no longer listening for connections
+    if (currentLeader != null) {
+      String[] address = currentLeader.getId().split(":");
+      String host = address[0];
+      int port = Integer.parseInt(address[1]);
+
+      OracleService.Client client = getOracleClient(host, port);
+      if (client != null) {
+        try {
+          while (client.isLeader()) {
+            Thread.sleep(500);
+          }
+        } catch (Exception e) {
+          log.debug("Exception thrown in takeLeadership()", e);
+        }
+      }
+    }
+
+    try {
+      synchronized (this) {
+        byte[] d = curatorFramework.getData().forPath(maxTsPath);
+        currentTs = maxTs = LongUtil.fromByteArray(d);
+      }
+
+      gcTsTracker = new GcTimestampTracker();
+      gcTsTracker.start();
+
+      isLeader = true;
+      log.info("Assumed leadership " + leaderLatch.getId());
+    } catch (Exception e) {
+      log.warn("Failed to become leader ", e);
+    }
+  }
+
   public synchronized void stop() throws Exception {
     if (started) {
 
+
+      isLeader = false;
+
       server.stop();
       serverThread.join();
 
@@ -357,8 +420,17 @@ public synchronized void stop() throws Exception {
       if (curatorFramework.getState().equals(CuratorFrameworkState.STARTED)) {
         pathChildrenCache.getListenable().removeListener(this);
         pathChildrenCache.close();
-        leaderSelector.close();
-        curatorFramework.getConnectionStateListenable().removeListener(this);
+        leaderLatch.close();
+
+        execService.shutdown();
+
+        execService.awaitTermination(10, TimeUnit.SECONDS);
+
+        curatorFramework.getConnectionStateListenable().removeListener(cnxnListener);
+
+        // leaderLatch.close() schedules a background delete, give it a chance to process before
+        // closing curator... this is done to avoid spurious exceptions, see CURATOR-467
+        Uninterruptibles.sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
         curatorFramework.close();
       }
       log.info("Oracle server has been stopped.");
@@ -381,58 +453,6 @@ public synchronized void stop() throws Exception {
     return null;
   }
 
-  /**
-   * Upon an oracle being elected the leader, it will need to adjust its starting timestamp to the
-   * last timestamp set in zookeeper.
-   *
-   * @param curatorFramework Curator framework
-   */
-  @Override
-  public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
-
-    try {
-      // sanity check- make sure previous oracle is no longer listening for connections
-      if (currentLeader != null) {
-        String[] address = currentLeader.getId().split(":");
-        String host = address[0];
-        int port = Integer.parseInt(address[1]);
-
-        OracleService.Client client = getOracleClient(host, port);
-        if (client != null) {
-          try {
-            while (client.isLeader()) {
-              Thread.sleep(500);
-            }
-          } catch (Exception e) {
-            log.debug("Exception thrown in takeLeadership()", e);
-          }
-        }
-      }
-
-      synchronized (this) {
-        byte[] d = curatorFramework.getData().forPath(maxTsPath);
-        currentTs = maxTs = LongUtil.fromByteArray(d);
-      }
-
-      gcTsTracker = new GcTimestampTracker();
-      gcTsTracker.start();
-
-      isLeader = true;
-
-      while (started) {
-        // if leadership is lost, then curator will interrupt the thread that called this method
-        Thread.sleep(100);
-      }
-    } finally {
-      isLeader = false;
-
-      if (started) {
-        // if we stopped the server manually, we shouldn't halt
-        Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
-      }
-    }
-  }
-
   @Override
   public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
       throws Exception {
@@ -442,8 +462,8 @@ public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)
           || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED))) {
         synchronized (this) {
-          Participant participant = leaderSelector.getLeader();
-          if (isLeader(participant) && !leaderSelector.hasLeadership()) {
+          Participant participant = leaderLatch.getLeader();
+          if (isLeader(participant) && !leaderLatch.hasLeadership()) {
             // in case current instance becomes leader, we want to know who came before it.
             currentLeader = participant;
           }
@@ -453,5 +473,4 @@ public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent
       log.warn("Oracle leadership watcher has been interrupted unexpectedly");
     }
   }
-
 }
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
index 7e689994..1aa2d944 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/CuratorUtil.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -89,7 +89,6 @@ public static CuratorFramework newRootFluoCurator(FluoConfiguration config) {
   public static CuratorFramework newCurator(String zookeepers, int timeout, String secret) {
 
     final ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
-
     if (secret.isEmpty()) {
       return CuratorFrameworkFactory.newClient(zookeepers, timeout, timeout, retry);
     } else {
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
index ccd9e737..8c895ee5 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -26,6 +26,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Supplier;
 
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.oracle.OracleClient;
@@ -180,7 +181,7 @@ public void threadTest() throws Exception {
   @Test
   public void failover_newTimestampRequested() throws Exception {
 
-    sleepUntilConnected(oserver);
+    sleepUntil(oserver::isConnected);
 
     int port2 = PortUtils.getRandomFreePort();
     int port3 = PortUtils.getRandomFreePort();
@@ -189,10 +190,10 @@ public void failover_newTimestampRequested() throws Exception {
     TestOracle oserver3 = createExtraOracle(port3);
 
     oserver2.start();
-    sleepUntilConnected(oserver2);
+    sleepUntil(oserver2::isConnected);
 
     oserver3.start();
-    sleepUntilConnected(oserver3);
+    sleepUntil(oserver3::isConnected);
 
     OracleClient client = env.getSharedResources().getOracleClient();
 
@@ -205,15 +206,18 @@ public void failover_newTimestampRequested() throws Exception {
     assertTrue(client.getOracle().endsWith(Integer.toString(oserver.getPort())));
 
     oserver.stop();
-    sleepWhileConnected(oserver);
+    sleepWhile(oserver::isConnected);
+    sleepUntil(oserver2::isLeader);
 
     assertEquals(1002, client.getStamp().getTxTimestamp());
     assertTrue(client.getOracle().endsWith(Integer.toString(port2)));
 
     oserver2.stop();
-    sleepWhileConnected(oserver2);
+    sleepWhile(oserver2::isConnected);
     oserver2.close();
 
+    sleepUntil(oserver3::isLeader);
+
     assertEquals(2002, client.getStamp().getTxTimestamp());
     assertTrue(client.getOracle().endsWith(Integer.toString(port3)));
 
@@ -228,7 +232,7 @@ public void failover_newTimestampRequested() throws Exception {
   @Test
   public void singleOracle_goesAwayAndComesBack() throws Exception {
 
-    sleepUntilConnected(oserver);
+    sleepUntil(oserver::isConnected);
 
     OracleClient client = env.getSharedResources().getOracleClient();
 
@@ -239,7 +243,7 @@ public void singleOracle_goesAwayAndComesBack() throws Exception {
     }
 
     oserver.stop();
-    sleepWhileConnected(oserver);
+    sleepWhile(oserver::isConnected);
 
     while (client.getOracle() != null) {
       Thread.sleep(100);
@@ -248,7 +252,7 @@ public void singleOracle_goesAwayAndComesBack() throws Exception {
     assertNull(client.getOracle());
 
     oserver.start();
-    sleepUntilConnected(oserver);
+    sleepUntil(oserver::isConnected);
 
     assertEquals(1002, client.getStamp().getTxTimestamp());
 
@@ -273,12 +277,12 @@ public void threadFailoverTest() throws Exception {
     TestOracle oserver2 = createExtraOracle(port2);
 
     oserver2.start();
-    sleepUntilConnected(oserver2);
+    sleepUntil(oserver2::isConnected);
 
     TestOracle oserver3 = createExtraOracle(port3);
 
     oserver3.start();
-    sleepUntilConnected(oserver3);
+    sleepUntil(oserver3::isConnected);
 
     for (int i = 0; i < numThreads; i++) {
       tpool.execute(new TimestampFetcher(numTimes, env, output, cdl));
@@ -318,14 +322,12 @@ public void threadFailoverTest() throws Exception {
     oserver3.close();
   }
 
-  private void sleepWhileConnected(OracleServer oserver) throws InterruptedException {
-    while (oserver.isConnected()) {
-      Thread.sleep(100);
-    }
+  private void sleepUntil(Supplier<Boolean> condition) throws InterruptedException {
+    sleepWhile(() -> !condition.get());
   }
 
-  private void sleepUntilConnected(OracleServer oserver) throws InterruptedException {
-    while (!oserver.isConnected()) {
+  private void sleepWhile(Supplier<Boolean> condition) throws InterruptedException {
+    while (condition.get()) {
       Thread.sleep(100);
     }
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services