You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by jk...@apache.org on 2020/03/04 21:31:56 UTC

[fluo] branch master updated: Add Oracle count functionality (#1087)

This is an automated email from the ASF dual-hosted git repository.

jkoshakow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ecd456  Add Oracle count functionality (#1087)
4ecd456 is described below

commit 4ecd4560e76a6ac44f7e80f75f5e42dcad86dddf
Author: Joseph Koshakow <jk...@users.noreply.github.com>
AuthorDate: Wed Mar 4 16:31:44 2020 -0500

    Add Oracle count functionality (#1087)
    
    * Add Oracle count functionality
    
    Add functionality to FluoAdminImpl to count the number of Oracles
    started. This allows the 'fluo list' command to print the number
    of Oracles for each app.
    
    Fixes #895
---
 .../main/java/org/apache/fluo/command/FluoList.java |  7 ++++---
 .../java/org/apache/fluo/command/FluoOracle.java    |  2 --
 .../org/apache/fluo/core/client/FluoAdminImpl.java  | 15 +++++++++++++++
 .../org/apache/fluo/core/oracle/OracleServer.java   |  5 +++++
 .../fluo/integration/client/FluoAdminImplIT.java    | 21 +++++++++++++++++++++
 5 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoList.java b/modules/command/src/main/java/org/apache/fluo/command/FluoList.java
index a430d0c..7a24922 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoList.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoList.java
@@ -51,8 +51,8 @@ public class FluoList extends ConfigCommand {
 
       System.out.println("Fluo instance (" + config.getInstanceZookeepers() + ") contains "
           + children.size() + " application(s)\n");
-      System.out.println("Application     Status     # Workers");
-      System.out.println("-----------     ------     ---------");
+      System.out.println("Application     Status     # Workers     # Oracles");
+      System.out.println("-----------     ------     ---------     ---------");
 
       for (String path : children) {
         listApp(config, path);
@@ -91,7 +91,8 @@ public class FluoList extends ConfigCommand {
         state = "RUNNING";
       }
       int numWorkers = admin.numWorkers();
-      System.out.format("%-15s %-11s %4d\n", path, state, numWorkers);
+      int numOracles = admin.numOracles();
+      System.out.format("%-15s %-11s %4d %13d\n", path, state, numWorkers, numOracles);
     }
   }
 }
diff --git a/modules/command/src/main/java/org/apache/fluo/command/FluoOracle.java b/modules/command/src/main/java/org/apache/fluo/command/FluoOracle.java
index 0f2f896..b03606f 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoOracle.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoOracle.java
@@ -19,8 +19,6 @@ import com.beust.jcommander.Parameters;
 import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.core.util.UtilWaitThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Parameters(commandNames = "oracle", commandDescription = "Starts Fluo Oracle process for <app>")
 public class FluoOracle extends AppCommand {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index f43e83a..68ba8ba 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.fluo.accumulo.iterators.GarbageCollectionIterator;
 import org.apache.fluo.accumulo.iterators.NotificationIterator;
 import org.apache.fluo.accumulo.summarizer.FluoSummarizer;
@@ -516,6 +517,20 @@ public class FluoAdminImpl implements FluoAdmin {
     return oracleExists(getAppCurator());
   }
 
+  public int numOracles() {
+    CuratorFramework curator = getAppCurator();
+    if (oracleExists(curator)) {
+      try {
+        LeaderLatch leaderLatch = new LeaderLatch(curator, ZookeeperPath.ORACLE_SERVER);
+        return leaderLatch.getParticipants().size();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return 0;
+    }
+  }
+
   public static int numWorkers(CuratorFramework curator) {
     int numWorkers = 0;
     try {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
index b399159..71ad696 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
@@ -466,4 +466,9 @@ public class OracleServer implements OracleService.Iface, PathChildrenCacheListe
       log.warn("Oracle leadership watcher has been interrupted unexpectedly");
     }
   }
+
+  @VisibleForTesting
+  public void awaitLeaderElection(long timeout, TimeUnit timeUnit) throws InterruptedException {
+    leaderLatch.await(timeout, timeUnit);
+  }
 }
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 87996e7..2d58c81 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -18,6 +18,7 @@ package org.apache.fluo.integration.client;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Iterables;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -265,4 +266,24 @@ public class FluoAdminImplIT extends ITBaseImpl {
     env2.close();
 
   }
+
+  @Test
+  public void testNumOracles() throws Exception {
+    try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
+      Assert.assertEquals(1, admin.numOracles());
+
+      OracleServer oserver2 = new OracleServer(env);
+      oserver2.start();
+      oserver2.awaitLeaderElection(3, TimeUnit.SECONDS);
+      Assert.assertEquals(2, admin.numOracles());
+
+      oserver2.stop();
+      oserver2.awaitLeaderElection(3, TimeUnit.SECONDS);
+      Assert.assertEquals(1, admin.numOracles());
+
+      oserver.stop();
+      oserver.awaitLeaderElection(3, TimeUnit.SECONDS);
+      Assert.assertEquals(0, admin.numOracles());
+    }
+  }
 }