You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by km...@apache.org on 2019/07/17 10:12:29 UTC
[oozie] branch master updated: OOZIE-2755 Oozie HA:
ZKJobsConcurrencyService throws runtime exception when numOozies is
zero(asalamon74 via kmarton)
This is an automated email from the ASF dual-hosted git repository.
kmarton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 8ef5c1f OOZIE-2755 Oozie HA: ZKJobsConcurrencyService throws runtime exception when numOozies is zero(asalamon74 via kmarton)
8ef5c1f is described below
commit 8ef5c1febfa3cb99efd853062e066de7e964f09a
Author: kmarton <km...@apache.org>
AuthorDate: Wed Jul 17 12:12:11 2019 +0200
OOZIE-2755 Oozie HA: ZKJobsConcurrencyService throws runtime exception when numOozies is zero(asalamon74 via kmarton)
---
.../oozie/service/JobsConcurrencyService.java | 6 ++-
.../oozie/service/ZKJobsConcurrencyService.java | 25 +++++++---
.../service/TestZKJobsConcurrencyService.java | 58 +++++++++++++++++++++-
release-log.txt | 1 +
4 files changed, 81 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 2334f7e..cd86753 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -101,8 +101,9 @@ public class JobsConcurrencyService implements Service, Instrumentable {
*
* @param jobId The jobId to check
* @return true
+ * @throws ServiceException never thrown in this implementation. Subclasses might throw this exception.
*/
- public boolean isJobIdForThisServer(String jobId) {
+ public boolean isJobIdForThisServer(String jobId) throws ServiceException {
return true;
}
@@ -111,8 +112,9 @@ public class JobsConcurrencyService implements Service, Instrumentable {
*
* @param ids The list of job ids to check
* @return ids
+ * @throws ServiceException never thrown in this implementation. Subclasses might throw this exception.
*/
- public List<String> getJobIdsForThisServer(List<String> ids) {
+ public List<String> getJobIdsForThisServer(List<String> ids) throws ServiceException {
return ids;
}
diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 75d088a..8f48ea5 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.x.discovery.ServiceInstance;
@@ -52,7 +53,8 @@ import org.apache.oozie.util.ZKUtils;
*/
public class ZKJobsConcurrencyService extends JobsConcurrencyService implements Service, Instrumentable {
- private ZKUtils zk;
+ @VisibleForTesting
+ ZKUtils zk;
// This pattern gives us the id number without the extra stuff
private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*");
@@ -69,6 +71,7 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
public void init(Services services) throws ServiceException {
super.init(services);
try {
+
zk = ZKUtils.register(this);
leaderLatch = new LeaderLatch(zk.getClient(), ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId());
leaderLatch.start();
@@ -120,10 +123,11 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
*
* @param jobId The jobId to check
* @return true if this server should process this jobId; false if not
+ * @throws ServiceException if an error occurred during communicating with zookeper
*/
@Override
- public boolean isJobIdForThisServer(String jobId) {
- List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
+ public boolean isJobIdForThisServer(String jobId) throws ServiceException {
+ List<ServiceInstance<Map>> oozies = getServiceInstances();
int numOozies = oozies.size();
int myIndex = zk.getZKIdIndex(oozies);
return checkJobIdForServer(jobId, numOozies, myIndex);
@@ -135,11 +139,12 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
*
* @param ids The list of job ids to check
* @return filteredIds a filtered list of job ids that this server should process
+ * @throws ServiceException if an error occurred during communicating with zookeper
*/
@Override
- public List<String> getJobIdsForThisServer(List<String> ids) {
- List<String> filteredIds = new ArrayList<String>();
- List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
+ public List<String> getJobIdsForThisServer(List<String> ids) throws ServiceException {
+ List<String> filteredIds = new ArrayList<>();
+ List<ServiceInstance<Map>> oozies = getServiceInstances();
int numOozies = oozies.size();
int myIndex = zk.getZKIdIndex(oozies);
for(String id : ids) {
@@ -150,6 +155,14 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
return filteredIds;
}
+ private List<ServiceInstance<Map>> getServiceInstances() throws ServiceException {
+ List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
+ if (oozies == null || oozies.isEmpty()) {
+ throw new ServiceException(ErrorCode.E1700, "Empty oozies list");
+ }
+ return oozies;
+ }
+
/**
* Check if the jobId should be processed by the server with index myIndex when there are numOozies servers.
*
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
index 9108bf0..6eed32d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static junit.framework.Assert.assertEquals;
+import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.test.ZKXTestCase;
import org.apache.oozie.util.ConfigUtils;
@@ -175,6 +175,32 @@ public class TestZKJobsConcurrencyService extends ZKXTestCase {
}
}
+ public void testIsJobIdForThisServerBadZk() throws Exception {
+ ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
+ try {
+ zkjcs.init(Services.get());
+ assertTrue(zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W"));
+ assertTrue(zkjcs.isJobIdForThisServer("0000001-130521183438837-oozie-rkan-W"));
+ assertTrue(zkjcs.isJobIdForThisServer("0000002-130521183438837-oozie-rkan-W"));
+ assertTrue(zkjcs.isJobIdForThisServer("0000003-130521183438837-oozie-rkan-W"));
+ assertTrue(zkjcs.isJobIdForThisServer("0000004-130521183438837-oozie-rkan-W"));
+ assertTrue(zkjcs.isJobIdForThisServer("0000005-130521183438837-oozie-rkan-W"));
+ assertTrue(zkjcs.isJobIdForThisServer("0000006-130521183438837-oozie-rkan-W"));
+ assertTrue(zkjcs.isJobIdForThisServer("blah"));
+ zkjcs.zk.getClient().close(); // simulating zookeeper problem
+ sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
+ try {
+ zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W");
+ fail("Expected ServletException");
+ } catch (ServiceException e) {
+ assertEquals(ErrorCode.E1700, e.getErrorCode());
+ }
+ }
+ finally {
+ zkjcs.destroy();
+ }
+ }
+
public void testGetJobIdsForThisServer() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
// We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence getJobIdsForThisServer()
@@ -235,6 +261,36 @@ public class TestZKJobsConcurrencyService extends ZKXTestCase {
}
}
+ public void testGetJobIdsForThisServerBadZk() throws Exception {
+ ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
+ try {
+ zkjcs.init(Services.get());
+ List<String> ids = new ArrayList<String>();
+ ids.add("0000000-130521183438837-oozie-rkan-W");
+ ids.add("0000001-130521183438837-oozie-rkan-W");
+ ids.add("0000002-130521183438837-oozie-rkan-W");
+ ids.add("0000003-130521183438837-oozie-rkan-W");
+ ids.add("0000004-130521183438837-oozie-rkan-W");
+ ids.add("0000005-130521183438837-oozie-rkan-W");
+ ids.add("0000006-130521183438837-oozie-rkan-W");
+ ids.add("blah");
+ List<String> ids2 = zkjcs.getJobIdsForThisServer(ids);
+ assertEquals(8, ids2.size());
+ assertTrue(ids2.containsAll(ids));
+ zkjcs.zk.getClient().close(); // simulating zookeeper problem
+ sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
+ try {
+ zkjcs.getJobIdsForThisServer(ids);
+ fail("Expected ServletException");
+ } catch (ServiceException e) {
+ assertEquals(ErrorCode.E1700, e.getErrorCode());
+ }
+ }
+ finally {
+ zkjcs.destroy();
+ }
+ }
+
public void testGetServerUrls() throws Exception {
ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
// We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence getServerUrls()
diff --git a/release-log.txt b/release-log.txt
index 8e0a18a..86693cc 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.2.0 release (trunk - unreleased)
+OOZIE-2755 Oozie HA: ZKJobsConcurrencyService throws runtime exception when numOozies is zero(asalamon74 via kmarton)
OOZIE-3527 Oozie stuck in waiting state if CoordPushDependencyCheckXCommand is not requeued (mgogineni via rohini)
OOZIE-3524 fs:fileSize() does not work correctly for files with extra slash in path (mgogineni via asalamon74)
OOZIE-3523 First missing dependency is shown incorrectly (mgogineni via asalamon74)