You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@fluo.apache.org by jk...@apache.org on 2020/03/07 02:46:08 UTC
[fluo] branch master updated: Use LeaderLatch to determine if
oracle exists (#1088)
This is an automated email from the ASF dual-hosted git repository.
jkoshakow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new f2cf203 Use LeaderLatch to determine if oracle exists (#1088)
f2cf203 is described below
commit f2cf203fb2901769c2c6035af11a4310af8ac19a
Author: Joseph Koshakow <jk...@users.noreply.github.com>
AuthorDate: Fri Mar 6 21:46:00 2020 -0500
Use LeaderLatch to determine if oracle exists (#1088)
* Use LeaderLatch to determine if oracle exists
The old implementation of oracleExists checks to see if there are
more than 1 ZNodes under the ZookeeperPath.ORACLE_SERVER path. This
relies on the LeaderLatch implementation and uses a path given to
a Curator recipe, which is advised against by the Curator docs.
See https://cwiki.apache.org/confluence/display/CURATOR/TN7.
The new implementation utilizes LeaderLatch to count the number
of participants to see if there's more than 0.
---
.../org/apache/fluo/core/client/FluoAdminImpl.java | 29 ++++++++--------------
.../fluo/integration/client/FluoAdminImplIT.java | 28 +++++++++++++++++++++
.../apache/fluo/mapreduce/FluoOutputFormat.java | 3 +--
3 files changed, 40 insertions(+), 20 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 68ba8ba..c705792 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -335,7 +335,6 @@ public class FluoAdminImpl implements FluoAdmin {
CuratorFramework curator = getAppCurator();
ObserverUtil.initialize(curator, config);
-
sharedProps.store(baos, "Shared java props");
CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
@@ -505,32 +504,26 @@ public class FluoAdminImpl implements FluoAdmin {
}
public static boolean oracleExists(CuratorFramework curator) {
- try {
- return curator.checkExists().forPath(ZookeeperPath.ORACLE_SERVER) != null
- && !curator.getChildren().forPath(ZookeeperPath.ORACLE_SERVER).isEmpty();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ return numOracles(curator) > 0;
}
public boolean oracleExists() {
return oracleExists(getAppCurator());
}
- public int numOracles() {
- CuratorFramework curator = getAppCurator();
- if (oracleExists(curator)) {
- try {
- LeaderLatch leaderLatch = new LeaderLatch(curator, ZookeeperPath.ORACLE_SERVER);
- return leaderLatch.getParticipants().size();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- } else {
- return 0;
+ private static int numOracles(CuratorFramework curator) {
+ try {
+ LeaderLatch leaderLatch = new LeaderLatch(curator, ZookeeperPath.ORACLE_SERVER);
+ return leaderLatch.getParticipants().size();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
+ public int numOracles() {
+ return numOracles(getAppCurator());
+ }
+
public static int numWorkers(CuratorFramework curator) {
int numWorkers = 0;
try {
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 2d58c81..6c34be3 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.accumulo.util.ZookeeperUtil;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
@@ -49,6 +50,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -286,4 +289,29 @@ public class FluoAdminImplIT extends ITBaseImpl {
Assert.assertEquals(0, admin.numOracles());
}
}
+
+ @Test
+ public void testNumOraclesWithMissingOraclePath() throws Exception {
+ oserver.stop();
+ try (CuratorFramework curator = CuratorUtil.newAppCurator(config);
+ FluoAdminImpl admin = new FluoAdminImpl(config)) {
+ curator.start();
+ oserver.awaitLeaderElection(3, TimeUnit.SECONDS);
+ curator.delete().forPath(ZookeeperPath.ORACLE_SERVER);
+
+ assertEquals(0, admin.numOracles());
+ }
+ }
+
+ @Test
+ public void testOracleExists() throws Exception {
+ try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
+ assertTrue(admin.oracleExists());
+
+ oserver.stop();
+ oserver.awaitLeaderElection(3, TimeUnit.SECONDS);
+
+ assertFalse(admin.oracleExists());
+ }
+ }
}
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
index e0b9cf9..e0bd83f 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
@@ -43,8 +43,7 @@ public class FluoOutputFormat extends OutputFormat<Loader, NullWritable> {
private static String PROPS_CONF_KEY = FluoOutputFormat.class.getName() + ".props";
@Override
- public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {
- }
+ public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)