You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by jk...@apache.org on 2020/03/07 02:46:08 UTC

[fluo] branch master updated: Use LeaderLatch to determine if oracle exists (#1088)

This is an automated email from the ASF dual-hosted git repository.

jkoshakow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new f2cf203  Use LeaderLatch to determine if oracle exists (#1088)
f2cf203 is described below

commit f2cf203fb2901769c2c6035af11a4310af8ac19a
Author: Joseph Koshakow <jk...@users.noreply.github.com>
AuthorDate: Fri Mar 6 21:46:00 2020 -0500

    Use LeaderLatch to determine if oracle exists (#1088)
    
    * Use LeaderLatch to determine if oracle exists
    
    The old implementation of oracleExists checks to see if there are
    more than 1 ZNodes under the ZookeeperPath.ORACLE_SERVER path. This
    relies on the LeaderLatch implementation and uses a path given to
    a Curator recipe, which is advised against by the Curator docs.
    See https://cwiki.apache.org/confluence/display/CURATOR/TN7.
    
    The new implementation utilizes LeaderLatch to count the number
    of participants to see if there's more than 0.
---
 .../org/apache/fluo/core/client/FluoAdminImpl.java | 29 ++++++++--------------
 .../fluo/integration/client/FluoAdminImplIT.java   | 28 +++++++++++++++++++++
 .../apache/fluo/mapreduce/FluoOutputFormat.java    |  3 +--
 3 files changed, 40 insertions(+), 20 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 68ba8ba..c705792 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -335,7 +335,6 @@ public class FluoAdminImpl implements FluoAdmin {
       CuratorFramework curator = getAppCurator();
       ObserverUtil.initialize(curator, config);
 
-
       sharedProps.store(baos, "Shared java props");
 
       CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
@@ -505,32 +504,26 @@ public class FluoAdminImpl implements FluoAdmin {
   }
 
   public static boolean oracleExists(CuratorFramework curator) {
-    try {
-      return curator.checkExists().forPath(ZookeeperPath.ORACLE_SERVER) != null
-          && !curator.getChildren().forPath(ZookeeperPath.ORACLE_SERVER).isEmpty();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    return numOracles(curator) > 0;
   }
 
   public boolean oracleExists() {
     return oracleExists(getAppCurator());
   }
 
-  public int numOracles() {
-    CuratorFramework curator = getAppCurator();
-    if (oracleExists(curator)) {
-      try {
-        LeaderLatch leaderLatch = new LeaderLatch(curator, ZookeeperPath.ORACLE_SERVER);
-        return leaderLatch.getParticipants().size();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      return 0;
+  private static int numOracles(CuratorFramework curator) {
+    try {
+      LeaderLatch leaderLatch = new LeaderLatch(curator, ZookeeperPath.ORACLE_SERVER);
+      return leaderLatch.getParticipants().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
+  public int numOracles() {
+    return numOracles(getAppCurator());
+  }
+
   public static int numWorkers(CuratorFramework curator) {
     int numWorkers = 0;
     try {
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 2d58c81..6c34be3 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
@@ -49,6 +50,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -286,4 +289,29 @@ public class FluoAdminImplIT extends ITBaseImpl {
       Assert.assertEquals(0, admin.numOracles());
     }
   }
+
+  @Test
+  public void testNumOraclesWithMissingOraclePath() throws Exception {
+    oserver.stop();
+    try (CuratorFramework curator = CuratorUtil.newAppCurator(config);
+        FluoAdminImpl admin = new FluoAdminImpl(config)) {
+      curator.start();
+      oserver.awaitLeaderElection(3, TimeUnit.SECONDS);
+      curator.delete().forPath(ZookeeperPath.ORACLE_SERVER);
+
+      assertEquals(0, admin.numOracles());
+    }
+  }
+
+  @Test
+  public void testOracleExists() throws Exception {
+    try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
+      assertTrue(admin.oracleExists());
+
+      oserver.stop();
+      oserver.awaitLeaderElection(3, TimeUnit.SECONDS);
+
+      assertFalse(admin.oracleExists());
+    }
+  }
 }
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
index e0b9cf9..e0bd83f 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
@@ -43,8 +43,7 @@ public class FluoOutputFormat extends OutputFormat<Loader, NullWritable> {
   private static String PROPS_CONF_KEY = FluoOutputFormat.class.getName() + ".props";
 
   @Override
-  public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {
-  }
+  public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {}
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)