You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/06/12 12:38:56 UTC

oozie git commit: OOZIE-3260 [sla] Remove stale item above max retries on JPA related errors from in-memory SLA map (andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master 550850da7 -> 87b3d0fff


OOZIE-3260 [sla] Remove stale item above max retries on JPA related errors from in-memory SLA map (andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/87b3d0ff
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/87b3d0ff
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/87b3d0ff

Branch: refs/heads/master
Commit: 87b3d0fff47de3e25251a0b667050a8ffc36f615
Parents: 550850d
Author: Andras Piros <an...@cloudera.com>
Authored: Tue Jun 12 14:37:14 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Tue Jun 12 14:37:14 2018 +0200

----------------------------------------------------------------------
 .../org/apache/oozie/sla/SLACalcStatus.java     |  13 +-
 .../apache/oozie/sla/SLACalculatorMemory.java   |  53 +++++--
 .../apache/oozie/sla/service/SLAService.java    |   1 +
 core/src/main/resources/oozie-default.xml       |  11 ++
 .../org/apache/oozie/sla/TestSLACalcStatus.java |  41 ++++++
 .../oozie/sla/TestSLACalculatorMemory.java      | 145 +++++++++++++++++--
 docs/src/site/twiki/DG_SLAMonitoring.twiki      |  11 ++
 release-log.txt                                 |   1 +
 8 files changed, 257 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
index 7be16f0..3d7d6e8 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.oozie.sla;
 
 import java.util.Date;
@@ -46,6 +45,7 @@ public class SLACalcStatus extends SLAEvent {
     private Date lastModifiedTime;
     private byte eventProcessed;
     private String jobId;
+    private int retryCount = 0;
 
     private XLog LOG;
 
@@ -307,4 +307,15 @@ public class SLACalcStatus extends SLAEvent {
         setSLARegistrationBean(reg);
     }
 
+    int getRetryCount() {
+        return retryCount;
+    }
+
+    void incrementRetryCount() {
+        this.retryCount++;
+    }
+
+    void resetRetryCount() {
+        this.retryCount = 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index a6ed0ff..c7ccfcc 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -74,7 +74,7 @@ public class SLACalculatorMemory implements SLACalculator {
 
     private static XLog LOG = XLog.getLog(SLACalculatorMemory.class);
     // TODO optimization priority based insertion/processing/bumping up-down
-    protected Map<String, SLACalcStatus> slaMap;
+    private Map<String, SLACalcStatus> slaMap;
     protected Set<String> historySet;
     private static int capacity;
     private static JPAService jpaService;
@@ -84,11 +84,13 @@ public class SLACalculatorMemory implements SLACalculator {
     private Instrumentation instrumentation;
     public static final String INSTRUMENTATION_GROUP = "sla-calculator";
     public static final String SLA_MAP = "sla-map";
+    private int maxRetryCount;
 
     @Override
     public void init(Configuration conf) throws ServiceException {
         capacity = ConfigurationService.getInt(conf, SLAService.CONF_CAPACITY);
         jobEventLatency = ConfigurationService.getInt(conf, SLAService.CONF_JOB_EVENT_LATENCY);
+        maxRetryCount = ConfigurationService.getInt(conf, SLAService.CONF_MAXIMUM_RETRY_COUNT);
         slaMap = new ConcurrentHashMap<String, SLACalcStatus>();
         historySet = Collections.synchronizedSet(new HashSet<String>());
         jpaService = Services.get().get(JPAService.class);
@@ -218,9 +220,14 @@ public class SLACalculatorMemory implements SLACalculator {
     }
 
     /**
-     * Invoked via periodic run, update the SLA for registered jobs
+     * Invoked via periodic run, update the SLA for registered jobs.
+     * <p>
+     * Track the number of times the {@link SLACalcStatus} entry has not been processed successfully, and when a preconfigured
+     * {code oozie.sla.service.SLAService.maximum.retry.count} is reached, remove any {@link SLACalculatorMemory#slaMap} entries
+     * that are causing {@code JPAExecutorException}s of certain {@link ErrorCode}s.
+     * @param jobId the workflow or coordinator job or action ID the SLA is tracked against
      */
-    protected void updateJobSla(String jobId) throws Exception {
+    void updateJobSla(String jobId) throws Exception {
         SLACalcStatus slaCalc = slaMap.get(jobId);
 
         if (slaCalc == null) {
@@ -231,13 +238,16 @@ public class SLACalculatorMemory implements SLACalculator {
         // get eventProcessed on DB for validation in HA
         SLASummaryBean summaryBean = null;
         try {
-            summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance())
+            summaryBean = SLASummaryQueryExecutor.getInstance()
                     .get(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
-        }
-        catch (JPAExecutorException e) {
-            if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) {
-                LOG.debug("job [{0}] is is not in DB, removing from Memory", jobId);
-                removeAndDecrement(jobId);
+            resetRetryCount(jobId);
+        }
+        catch (final JPAExecutorException e) {
+            if (e.getErrorCode().equals(ErrorCode.E0603)
+                    || e.getErrorCode().equals(ErrorCode.E0604)
+                    || e.getErrorCode().equals(ErrorCode.E0605)) {
+                LOG.debug("job [{0}] is not in DB, removing from Memory", jobId);
+                incrementRetryCountAndRemove(jobId);
                 return;
             }
             throw e;
@@ -689,4 +699,29 @@ public class SLACalculatorMemory implements SLACalculator {
         LOG.trace("Tried to remove a non-existing item from SLA map. [jobId={0}]", jobId);
         return false;
     }
+
+    private void resetRetryCount(final String jobId) {
+        if (slaMap.containsKey(jobId)) {
+            LOG.debug("Resetting retry count on [{0}]", jobId);
+            final SLACalcStatus existingStatus = slaMap.get(jobId);
+            existingStatus.resetRetryCount();
+            putAndIncrement(jobId, existingStatus);
+        }
+    }
+
+    private void incrementRetryCountAndRemove(final String jobId) {
+        LOG.debug("Checking SLA calculator status [{0}] for retry count", jobId);
+        if (slaMap.containsKey(jobId)) {
+            final SLACalcStatus existingStatus = slaMap.get(jobId);
+            if (existingStatus.getRetryCount() < maxRetryCount) {
+                existingStatus.incrementRetryCount();
+                LOG.debug("Retrying with SLA calculator status [{0}] retry count [{1}]", jobId, existingStatus.getRetryCount());
+                putAndIncrement(jobId, existingStatus);
+            }
+            else {
+                LOG.debug("Removing [{0}] from SLA map as maximum retry count reached", jobId);
+                removeAndDecrement(jobId);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
index 2d23a22..646fe29 100644
--- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
+++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java
@@ -53,6 +53,7 @@ public class SLAService implements Service {
     public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay";
     public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout";
     public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval";
+    public static final String CONF_MAXIMUM_RETRY_COUNT = CONF_PREFIX + "maximum.retry.count";
 
     private static SLACalculator calcImpl;
     private static boolean slaEnabled = false;

http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 3d627be..8d7465c 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2566,6 +2566,17 @@ will be the requeue interval for the actions which are waiting for a long time w
         </description>
     </property>
 
+    <property>
+        <name>oozie.sla.service.SLAService.maximum.retry.count</name>
+        <value>3</value>
+        <description>
+            Number of times an SLA calculator status will be tried to get updated when any database related error occurs.
+            It's possible that multiple WorkflowJobBean / CoordActionBean instances being inserted won't have SLACalcStatus entries
+            inside SLACalculatorMemory#slaMap by the time written to database, and thus, no SLA will be tracked.
+            In those rare cases, preconfigured maximum retry count can be extended.
+        </description>
+    </property>
+
     <!-- ZooKeeper configuration -->
     <property>
         <name>oozie.zookeeper.connection.string</name>

http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java
new file mode 100644
index 0000000..85cf9b3
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalcStatus.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.sla;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSLACalcStatus {
+
+    @Test
+    public void testRetryCountOperations() {
+        final SLACalcStatus status = new SLACalcStatus();
+        assertEquals("initial retryCount mismatch", 0, status.getRetryCount());
+
+        status.incrementRetryCount();
+        status.incrementRetryCount();
+        status.incrementRetryCount();
+        assertEquals("retryCount mismatch after increments", 3, status.getRetryCount());
+
+        status.resetRetryCount();
+        assertEquals("retryCount mismatch after reset", 0, status.getRetryCount());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
index f340137..4a8e8e6 100644
--- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
+++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java
@@ -63,6 +63,7 @@ import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.sla.service.SLAService;
 import org.apache.oozie.test.XDataTestCase;
@@ -1107,7 +1108,7 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
 
         try {
-            FailingDBHelperForTest.setDbPredicate(new SLARegistrationDmlPredicate());
+            FailingDBHelperForTest.setDbPredicate(new SLARegistrationInsertUpdatePredicate());
             prepareFailingDB();
             slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
             fail("Expected JPAExecutorException not thrown");
@@ -1154,7 +1155,99 @@ public class TestSLACalculatorMemory extends XDataTestCase {
             FailingDBHelperForTest.resetDbPredicate();
             System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER);
         }
+    }
 
+    public void testWhenSLARegistrationExistsWithoutSLASummaryUpdateSLARetries() throws Exception {
+        final SLACalculatorMemory slaCalculatorMemory = new SLACalculatorMemory();
+        slaCalculatorMemory.init(Services.get().get(ConfigurationService.class).getConf());
+        final String jobId = "job-1-W";
+        final SLARegistrationBean slaRegistration = _createSLARegistration(jobId, AppType.WORKFLOW_JOB);
+        slaCalculatorMemory.addRegistration(jobId, slaRegistration);
+
+        updateJobSlaFailing(slaCalculatorMemory, jobId,
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Assert.assertNotNull("after first update, SLACalcStatus should still be present",
+                                    slaCalculatorMemory.get(slaRegistration.getId()));
+                            Assert.assertEquals("updating SLA_REGISTRATION should have been failed",
+                                    slaRegistration,
+                                    slaCalculatorMemory.get(jobId).getSLARegistrationBean());
+                            Assert.assertEquals("SLACalcStatus.retryCount should have been increased",
+                                    1, slaCalculatorMemory.get(jobId).getRetryCount());
+                        } catch (JPAExecutorException ignored) {
+                        }
+                    }
+                });
+
+        updateJobSlaFailing(slaCalculatorMemory, jobId,
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Assert.assertNotNull("after second update, SLACalcStatus should still be present",
+                                    slaCalculatorMemory.get(slaRegistration.getId()));
+                            Assert.assertEquals("updating SLA_REGISTRATION should have been failed",
+                                    slaRegistration,
+                                    slaCalculatorMemory.get(jobId).getSLARegistrationBean());
+                            Assert.assertEquals("SLACalcStatus.retryCount should have been increased",
+                                    2, slaCalculatorMemory.get(jobId).getRetryCount());
+                        } catch (JPAExecutorException ignored) {
+                        }
+                    }
+                }
+        );
+
+        updateJobSlaFailing(slaCalculatorMemory, jobId,
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Assert.assertNotNull("after third update, SLACalcStatus should still be present",
+                                    slaCalculatorMemory.get(slaRegistration.getId()));
+                            Assert.assertEquals("updating SLA_REGISTRATION should have been failed",
+                                    slaRegistration,
+                                    slaCalculatorMemory.get(jobId).getSLARegistrationBean());
+                            Assert.assertEquals("SLACalcStatus.retryCount should have been increased",
+                                    3, slaCalculatorMemory.get(jobId).getRetryCount());
+                        } catch (JPAExecutorException ignored) {
+                        }
+                    }
+                }
+        );
+
+        updateJobSlaFailing(slaCalculatorMemory, jobId,
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Assert.assertNull("after fourth update, SLACalcStatus should no more be present",
+                                    slaCalculatorMemory.get(slaRegistration.getId()));
+                        } catch (JPAExecutorException ignored) {
+                        }
+                    }
+                }
+        );
+    }
+
+    private void updateJobSlaFailing(final SLACalculatorMemory slaCalculatorMemory,
+                                     final String jobId,
+                                     final Runnable assertsWhenFailing)
+            throws Exception {
+        try {
+            FailingDBHelperForTest.setDbPredicate(new SLASummarySelectPredicate(1));
+            prepareFailingDB();
+
+            slaCalculatorMemory.updateJobSla(jobId);
+        }
+        catch (final JPAExecutorException e) {
+            assertsWhenFailing.run();
+        }
+        finally {
+            FailingDBHelperForTest.resetDbPredicate();
+            System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER);
+        }
     }
 
     private void addAndUpdateRegistration(final SLACalculatorMemory slaCalcMemory, final String jobId,
@@ -1168,12 +1261,12 @@ public class TestSLACalculatorMemory extends XDataTestCase {
                                                                final SLARegistrationBean slaRegBean,
                                                                final SLARegistrationBean slaRegBean2) throws Exception {
         slaCalcMemory.addRegistration(jobId, slaRegBean);
-        FailingDBHelperForTest.setDbPredicate(new SLARegistrationDmlPredicate());
+        FailingDBHelperForTest.setDbPredicate(new SLARegistrationInsertUpdatePredicate());
         prepareFailingDB();
         slaCalcMemory.updateRegistration(jobId, slaRegBean2);
     }
 
-    private void prepareFailingDB() throws Exception {
+    private void prepareFailingDB() throws ServiceException {
         System.setProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER, Boolean.TRUE.toString());
         Configuration conf = services.get(ConfigurationService.class).getConf();
         conf.set(JPAService.CONF_DRIVER, AlwaysFailingHSQLDriverMapper.class.getCanonicalName());
@@ -1182,22 +1275,56 @@ public class TestSLACalculatorMemory extends XDataTestCase {
         jpaService.init(services);
     }
 
-    static class SLARegistrationDmlPredicate implements com.google.common.base.Predicate<String> {
-        private static final String TABLE_NAME = "SLA_REGISTRATION";
-        private static final Set<String> OPERATIONS = Sets.newHashSet("INSERT INTO ", "UPDATE ");
+    static class SLARegistrationInsertUpdatePredicate extends DmlPredicate {
+        SLARegistrationInsertUpdatePredicate() {
+            super("SLA_REGISTRATION", Sets.newHashSet("INSERT INTO ", "UPDATE "));
+        }
+    }
+
+    static class SLASummarySelectPredicate extends DmlPredicate {
+        private int remainingSuccessfulAttempts;
+        SLASummarySelectPredicate(final int remainingSuccessfulAttempts) {
+            super("SLA_SUMMARY", Sets.newHashSet("SELECT "));
+            this.remainingSuccessfulAttempts = remainingSuccessfulAttempts;
+        }
+
+        @Override
+        public boolean apply(@Nullable String input) {
+            if (super.apply(input)) {
+                if (remainingSuccessfulAttempts <= 0) {
+                    return true;
+                }
+                else {
+                    remainingSuccessfulAttempts--;
+                    return false;
+                }
+            }
+            else {
+                return false;
+            }
+        }
+    }
+
+    static class DmlPredicate implements com.google.common.base.Predicate<String> {
+        private final String tableName;
+        private final Set<String> operationPrefixes;
+
+        DmlPredicate(final String tableName, final Set<String> operationPrefixes) {
+            this.tableName = tableName;
+            this.operationPrefixes = operationPrefixes;
+        }
 
         @Override
         public boolean apply(@Nullable String input) {
             Preconditions.checkArgument(!Strings.isNullOrEmpty(input));
             boolean operationMatch = false;
-            for (String s: OPERATIONS) {
+            for (String s: operationPrefixes) {
                 if (input.startsWith(s)) {
                     operationMatch = true;
                     break;
                 }
             }
-            return operationMatch && input.toUpperCase().contains(TABLE_NAME);
+            return operationMatch && input.toUpperCase().contains(tableName);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/docs/src/site/twiki/DG_SLAMonitoring.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SLAMonitoring.twiki b/docs/src/site/twiki/DG_SLAMonitoring.twiki
index 29dd395..c91c227 100644
--- a/docs/src/site/twiki/DG_SLAMonitoring.twiki
+++ b/docs/src/site/twiki/DG_SLAMonitoring.twiki
@@ -408,6 +408,17 @@ Refer [[DG_CommandLineTool#Changing_job_SLA_definition_and_alerting][Changing jo
 ---++++ 4. Change using REST API
 Refer the REST API [[WebServicesAPI#Changing_job_SLA_definition_and_alerting][Changing job SLA definition and alerting]].
 
+---++ In-memory SLA entries and database content
+
+There are special circumstances when the in-memory =SLACalcStatus= entries can exist without the workflow or coordinator job or
+action instances in database. For example:
+   * SLA tracked database content may already have been deleted, and =SLA_SUMMARY= entry is not present anymore in database
+   * SLA tracked database content and =SLA_SUMMARY= entry aren't yet present in database
+
+By the time =SLAService= scheduled job will be running, SLA map contents are checked. When the =SLA_SUMMARY= entry for the in-memory
+SLA entry is missing, a counter is increased. When this counter reaches the server-wide preconfigured value
+=oozie.sla.service.SLAService.maximum.retry.count= (by default =3=), in-memory SLA entry will get purged.
+
 ---++ Known issues
 There are two known issues when you define SLA for a workflow action.
    * If there are decision nodes and SLA is defined for a workflow action not in the execution path because of the decision node, you will still get an SLA_MISS notification.

http://git-wip-us.apache.org/repos/asf/oozie/blob/87b3d0ff/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 3e7e98d..a93be9d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3260 [sla] Remove stale item above max retries on JPA related errors from in-memory SLA map (andras.piros)
 OOZIE-3233 Remove DST shift from the coordinator job's end time (kmarton via andras.piros)
 OOZIE-1393 Allow sending emails via TLS (mbalakrishnan, dionusos via andras.piros)
 OOZIE-3156 Retry SSH action check when cannot connect to remote host (txsing via andras.piros)