You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by km...@apache.org on 2019/07/17 10:12:29 UTC

[oozie] branch master updated: OOZIE-2755 Oozie HA: ZKJobsConcurrencyService throws runtime exception when numOozies is zero(asalamon74 via kmarton)

This is an automated email from the ASF dual-hosted git repository.

kmarton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ef5c1f  OOZIE-2755 Oozie HA: ZKJobsConcurrencyService throws runtime exception when numOozies is zero(asalamon74 via kmarton)
8ef5c1f is described below

commit 8ef5c1febfa3cb99efd853062e066de7e964f09a
Author: kmarton <km...@apache.org>
AuthorDate: Wed Jul 17 12:12:11 2019 +0200

    OOZIE-2755 Oozie HA: ZKJobsConcurrencyService throws runtime exception when numOozies is zero(asalamon74 via kmarton)
---
 .../oozie/service/JobsConcurrencyService.java      |  6 ++-
 .../oozie/service/ZKJobsConcurrencyService.java    | 25 +++++++---
 .../service/TestZKJobsConcurrencyService.java      | 58 +++++++++++++++++++++-
 release-log.txt                                    |  1 +
 4 files changed, 81 insertions(+), 9 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
index 2334f7e..cd86753 100644
--- a/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/JobsConcurrencyService.java
@@ -101,8 +101,9 @@ public class JobsConcurrencyService implements Service, Instrumentable {
      *
      * @param jobId The jobId to check
      * @return true
+     * @throws ServiceException never thrown in this implementation. Subclasses might throw this exception.
      */
-    public boolean isJobIdForThisServer(String jobId) {
+    public boolean isJobIdForThisServer(String jobId) throws ServiceException {
         return true;
     }
 
@@ -111,8 +112,9 @@ public class JobsConcurrencyService implements Service, Instrumentable {
      *
      * @param ids The list of job ids to check
      * @return ids
+     * @throws ServiceException never thrown in this implementation. Subclasses might throw this exception.
      */
-    public List<String> getJobIdsForThisServer(List<String> ids) {
+    public List<String> getJobIdsForThisServer(List<String> ids) throws ServiceException {
         return ids;
     }
 
diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 75d088a..8f48ea5 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.x.discovery.ServiceInstance;
@@ -52,7 +53,8 @@ import org.apache.oozie.util.ZKUtils;
  */
 public class ZKJobsConcurrencyService extends JobsConcurrencyService implements Service, Instrumentable {
 
-    private ZKUtils zk;
+    @VisibleForTesting
+    ZKUtils zk;
 
     // This pattern gives us the id number without the extra stuff
     private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*");
@@ -69,6 +71,7 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
     public void init(Services services) throws ServiceException {
         super.init(services);
         try {
+
             zk = ZKUtils.register(this);
             leaderLatch = new LeaderLatch(zk.getClient(), ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId());
             leaderLatch.start();
@@ -120,10 +123,11 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
      *
      * @param jobId The jobId to check
      * @return true if this server should process this jobId; false if not
+     * @throws ServiceException if an error occurred during communicating with zookeper
      */
     @Override
-    public boolean isJobIdForThisServer(String jobId) {
-        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
+    public boolean isJobIdForThisServer(String jobId) throws ServiceException {
+        List<ServiceInstance<Map>> oozies = getServiceInstances();
         int numOozies = oozies.size();
         int myIndex = zk.getZKIdIndex(oozies);
         return checkJobIdForServer(jobId, numOozies, myIndex);
@@ -135,11 +139,12 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
      *
      * @param ids The list of job ids to check
      * @return filteredIds a filtered list of job ids that this server should process
+     * @throws ServiceException if an error occurred during communicating with zookeper
      */
     @Override
-    public List<String> getJobIdsForThisServer(List<String> ids) {
-        List<String> filteredIds = new ArrayList<String>();
-        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
+    public List<String> getJobIdsForThisServer(List<String> ids) throws ServiceException {
+        List<String> filteredIds = new ArrayList<>();
+        List<ServiceInstance<Map>> oozies = getServiceInstances();
         int numOozies = oozies.size();
         int myIndex = zk.getZKIdIndex(oozies);
         for(String id : ids) {
@@ -150,6 +155,14 @@ public class ZKJobsConcurrencyService extends JobsConcurrencyService implements
         return filteredIds;
     }
 
+    private List<ServiceInstance<Map>> getServiceInstances() throws ServiceException {
+        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
+        if (oozies == null || oozies.isEmpty()) {
+            throw new ServiceException(ErrorCode.E1700, "Empty oozies list");
+        }
+        return oozies;
+    }
+
     /**
      * Check if the jobId should be processed by the server with index myIndex when there are numOozies servers.
      *
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
index 9108bf0..6eed32d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKJobsConcurrencyService.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import static junit.framework.Assert.assertEquals;
 
+import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.test.ZKXTestCase;
 import org.apache.oozie.util.ConfigUtils;
@@ -175,6 +175,32 @@ public class TestZKJobsConcurrencyService extends ZKXTestCase {
         }
     }
 
+    public void testIsJobIdForThisServerBadZk() throws Exception {
+        ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
+        try {
+            zkjcs.init(Services.get());
+            assertTrue(zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W"));
+            assertTrue(zkjcs.isJobIdForThisServer("0000001-130521183438837-oozie-rkan-W"));
+            assertTrue(zkjcs.isJobIdForThisServer("0000002-130521183438837-oozie-rkan-W"));
+            assertTrue(zkjcs.isJobIdForThisServer("0000003-130521183438837-oozie-rkan-W"));
+            assertTrue(zkjcs.isJobIdForThisServer("0000004-130521183438837-oozie-rkan-W"));
+            assertTrue(zkjcs.isJobIdForThisServer("0000005-130521183438837-oozie-rkan-W"));
+            assertTrue(zkjcs.isJobIdForThisServer("0000006-130521183438837-oozie-rkan-W"));
+            assertTrue(zkjcs.isJobIdForThisServer("blah"));
+            zkjcs.zk.getClient().close(); // simulating zookeeper problem
+            sleep(1000);    // Sleep to allow ZKUtils ServiceCache to update
+            try {
+                zkjcs.isJobIdForThisServer("0000000-130521183438837-oozie-rkan-W");
+                fail("Expected ServletException");
+            } catch (ServiceException e) {
+                assertEquals(ErrorCode.E1700, e.getErrorCode());
+            }
+        }
+        finally {
+            zkjcs.destroy();
+        }
+    }
+
     public void testGetJobIdsForThisServer() throws Exception {
         ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
         // We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence getJobIdsForThisServer()
@@ -235,6 +261,36 @@ public class TestZKJobsConcurrencyService extends ZKXTestCase {
         }
     }
 
+    public void testGetJobIdsForThisServerBadZk() throws Exception {
+        ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
+        try {
+            zkjcs.init(Services.get());
+            List<String> ids = new ArrayList<String>();
+            ids.add("0000000-130521183438837-oozie-rkan-W");
+            ids.add("0000001-130521183438837-oozie-rkan-W");
+            ids.add("0000002-130521183438837-oozie-rkan-W");
+            ids.add("0000003-130521183438837-oozie-rkan-W");
+            ids.add("0000004-130521183438837-oozie-rkan-W");
+            ids.add("0000005-130521183438837-oozie-rkan-W");
+            ids.add("0000006-130521183438837-oozie-rkan-W");
+            ids.add("blah");
+            List<String> ids2 = zkjcs.getJobIdsForThisServer(ids);
+            assertEquals(8, ids2.size());
+            assertTrue(ids2.containsAll(ids));
+            zkjcs.zk.getClient().close(); // simulating zookeeper problem
+            sleep(1000);    // Sleep to allow ZKUtils ServiceCache to update
+            try {
+                zkjcs.getJobIdsForThisServer(ids);
+                fail("Expected ServletException");
+            } catch (ServiceException e) {
+                assertEquals(ErrorCode.E1700, e.getErrorCode());
+            }
+        }
+        finally {
+            zkjcs.destroy();
+        }
+    }
+
     public void testGetServerUrls() throws Exception {
         ZKJobsConcurrencyService zkjcs = new ZKJobsConcurrencyService();
         // We'll use some DummyZKXOozies here to pretend to be other Oozie servers that will influence getServerUrls()
diff --git a/release-log.txt b/release-log.txt
index 8e0a18a..86693cc 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-2755 Oozie HA: ZKJobsConcurrencyService throws runtime exception when numOozies is zero(asalamon74 via kmarton)
 OOZIE-3527 Oozie stuck in waiting state if CoordPushDependencyCheckXCommand is not requeued (mgogineni via rohini)
 OOZIE-3524 fs:fileSize() does not work correctly for files with extra slash in path (mgogineni via asalamon74)
 OOZIE-3523 First missing dependency is shown incorrectly (mgogineni via asalamon74)