You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by di...@apache.org on 2022/12/07 11:36:08 UTC

[oozie] branch master updated: OOZIE-3679 Correct maximum wait time between database retry attempts property (jmakai via dionusos)

This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 83f9c9e13 OOZIE-3679 Correct maximum wait time between database retry attempts property (jmakai via dionusos)
83f9c9e13 is described below

commit 83f9c9e1341d96f3c105904d47f134215210c6d8
Author: Denes Bodo <di...@apache.org>
AuthorDate: Wed Dec 7 12:14:55 2022 +0100

    OOZIE-3679 Correct maximum wait time between database retry attempts property (jmakai via dionusos)
---
 .../java/org/apache/oozie/service/JPAService.java  |   6 +-
 .../org/apache/oozie/service/TestJPAService.java   | 319 +++++++++++++++++++--
 release-log.txt                                    |   1 +
 3 files changed, 307 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/service/JPAService.java b/core/src/main/java/org/apache/oozie/service/JPAService.java
index 69f0793cc..052edaf82 100644
--- a/core/src/main/java/org/apache/oozie/service/JPAService.java
+++ b/core/src/main/java/org/apache/oozie/service/JPAService.java
@@ -90,7 +90,8 @@ public class JPAService implements Service, Instrumentable {
     public static final String CONF_VALIDATE_DB_CONN_EVICTION_NUM = CONF_PREFIX + "validate.db.connection.eviction.num";
     public static final String CONF_OPENJPA_BROKER_IMPL = CONF_PREFIX + "openjpa.BrokerImpl";
     public static final String INITIAL_WAIT_TIME = CONF_PREFIX + "retry.initial-wait-time.ms";
-    public static final String MAX_WAIT_TIME = CONF_PREFIX + "maximum-wait-time.ms";
+    public static final String MAX_WAIT_TIME = CONF_PREFIX + "retry.maximum-wait-time.ms";
+    public static final String MAX_WAIT_TIME_DEPRECATED = CONF_PREFIX + "maximum-wait-time.ms";
     public static final String MAX_RETRY_COUNT = CONF_PREFIX + "retry.max-retries";
     public static final String SKIP_COMMIT_FAULT_INJECTION_CLASS = SkipCommitFaultInjection.class.getName();
 
@@ -259,7 +260,8 @@ public class JPAService implements Service, Instrumentable {
 
     private void initRetryHandler() {
         final long initialWaitTime = ConfigurationService.getInt(INITIAL_WAIT_TIME, (int) DEFAULT_INITIAL_WAIT_TIME);
-        final long maxWaitTime = ConfigurationService.getInt(MAX_WAIT_TIME, (int) DEFAULT_MAX_WAIT_TIME);
+        final long maxWaitTime = ConfigurationService.getInt(MAX_WAIT_TIME_DEPRECATED,
+                ConfigurationService.getInt(MAX_WAIT_TIME, (int) DEFAULT_MAX_WAIT_TIME));
         final int maxRetryCount = ConfigurationService.getInt(MAX_RETRY_COUNT, DEFAULT_MAX_RETRY_COUNT);
 
         LOG.info(XLog.STD, "Failing database operations will be retried {0} times, with an initial sleep time of {1} ms,"
diff --git a/core/src/test/java/org/apache/oozie/service/TestJPAService.java b/core/src/test/java/org/apache/oozie/service/TestJPAService.java
index bb4c90d91..705bebff6 100644
--- a/core/src/test/java/org/apache/oozie/service/TestJPAService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestJPAService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,41 +18,326 @@
 
 package org.apache.oozie.service;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.compression.CodecFactory;
 import org.apache.oozie.executor.jpa.JPAExecutor;
-import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.IOUtils;
+import org.apache.openjpa.conf.OpenJPAConfiguration;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerSPI;
+import org.apache.openjpa.persistence.OpenJPAEntityTransaction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
 import javax.persistence.EntityManager;
+import javax.persistence.Persistence;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import static org.apache.oozie.service.JPAService.CONF_PASSWORD;
+import static org.apache.oozie.service.JPAService.CONF_URL;
+import static org.apache.oozie.service.JPAService.INITIAL_WAIT_TIME;
+import static org.apache.oozie.service.JPAService.MAX_RETRY_COUNT;
+import static org.apache.oozie.service.JPAService.MAX_WAIT_TIME;
+import static org.apache.oozie.service.JPAService.MAX_WAIT_TIME_DEPRECATED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+
+@RunWith(Parameterized.class)
+public class TestJPAService {
 
-public class TestJPAService extends XTestCase {
+    private MockedStatic<Services> SERVICES;
+    private MockedStatic<IOUtils> IOUTILS;
+    private MockedStatic<Persistence> PERSISTENCE;
+    private MockedStatic<CodecFactory> CODEFACTORY;
 
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        new Services().init();
+    private TestJPAServiceInput testJPAServiceInput;
+
+    public TestJPAService(TestJPAServiceInput testJPAServiceInput) {
+        this.testJPAServiceInput = testJPAServiceInput;
     }
 
-    @Override
-    protected void tearDown() throws Exception {
-        Services.get().destroy();
-        super.tearDown();
+    /* Defining the actual test cases below.
+    *
+    * Explanation of the content of the fields:
+    * - Configuration: the configuration properties to be used when initializing the JPA Service (in our case they are
+    * populated with the JPA service's retry mechanism related properties)
+    * - JPAExecutor: the JPA executor which is run by the JPA Service. In our cases there are two types of them:
+    * a passing (MyPassingJPAExecutor) and a failing (MyFailingJPAExecutor) one. The latter is responsible for
+    * activating the JPA Service's retry mechanism.
+    * - executedTimes: the property indicating the number of times the provided JPA Executor needs to be executed
+    * according to the values of the JPA Service's retry mechanism related configuration.
+    * - minElapsedTime: the value of the minimum amount of time in ms, that is elapsed between the first and last (retry)
+    * execution. (explanation can be found below about how this is calculated)
+    * - maxElapsedTime: the value of the maximum amount of time in ms, that is elapsed between the first and last (retry)
+    *  execution. (explanation can be found below about how this is calculated)
+    *
+    * Explanation on how the expected elapsed time is being calculated:
+    * Due to the JPA Service's retry mechanism, after an unsuccessful try, it waits for `initialWaitTime` amount of time
+    * before the next try. If the second try is not successful either, than it waits for the double of `initialWaitTime`
+    * before the third try. This wait time is being doubled until the `maxWaitTime` is reached. Then the value of
+    * `maxWaitTime` will be used between the tries until the maximum number of tries `maxRetryCount` is reached.
+    * Therefore theoretically the minimum elapsed time can be calculated easily. The maximum elapsed time however is
+    * a proportional value that theoretically should not be crossed taking into consideration the value of the JPA
+    * Service's retry related configuration.
+    *
+    * */
+    public static final TestJPAServiceInput testJPAServiceInputPassing = new TestJPAServiceInput(
+            createConfiguration(100, 30000, null, 10),
+            new MyPassingJPAExecutor(),
+            1);
+
+    public static final TestJPAServiceInput testJPAServiceInputFailingWithDefaultRetryConfig = new TestJPAServiceInput(
+            createConfiguration(100, 30000, null, 10),
+            new MyFailingJPAExecutor(),
+            10,
+            51100,
+            52000);
+
+    public static final TestJPAServiceInput testJPAServiceInputFailinhWithOnlyCorrectRetryWaitTimeConfig =
+            new TestJPAServiceInput(
+                    createConfiguration(100, 200, null, 4),
+                    new MyFailingJPAExecutor(),
+                    4,
+                    500,
+                    600);
+
+    public static final TestJPAServiceInput testJPAServiceInputFailinhWithBothRetryWaitTimeConfig = new TestJPAServiceInput(
+            createConfiguration(100, 10000, 200, 4),
+            new MyFailingJPAExecutor(),
+            4,
+            500,
+            600);
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> testCases() {
+       List<TestJPAServiceInput> testJPAServiceInputs = Arrays.asList(
+               testJPAServiceInputPassing,
+               testJPAServiceInputFailingWithDefaultRetryConfig,
+               testJPAServiceInputFailinhWithOnlyCorrectRetryWaitTimeConfig,
+               testJPAServiceInputFailinhWithBothRetryWaitTimeConfig
+       );
+
+        Collection<Object[]> result = new ArrayList<>();
+        for (TestJPAServiceInput testJPAServiceInput : testJPAServiceInputs) {
+            result.add(new Object[] { testJPAServiceInput });
+        }
+
+        return result;
     }
 
     public static class MyJPAExecutor implements JPAExecutor<String> {
+        List<Date> dates = new ArrayList<>();
+
         @Override
         public String getName() {
-            return "my";
+            return "foobar";
         }
 
+        @Override
+        public String execute(EntityManager em) {
+            return null;
+        }
+    }
+
+    public static class MyPassingJPAExecutor extends MyJPAExecutor {
         @Override
         public String execute(EntityManager em) {
             assertNotNull(em);
             return "ret";
         }
     }
-    public void testExecute() throws Exception {
-        JPAService jpaService = Services.get().get(JPAService.class);
-        assertNotNull(jpaService);
-        String ret = jpaService.execute(new MyJPAExecutor());
-        assertEquals("ret", ret);
+
+    public static class MyFailingJPAExecutor extends MyJPAExecutor {
+        @Override
+        public String execute(EntityManager em) {
+            // saving the dates of the executions
+            Date date = new Date();
+            dates.add(date);
+
+            // intentionally throwing a not blacklisted exception to activate the retry mechanism
+            throw new UnsupportedOperationException("Intentionally thrown exception to activate the JPA Service's " +
+                    "retry mechanism...");
+        }
+    }
+
+    private static class TestJPAServiceInput {
+
+        private Configuration conf;
+        private JPAExecutor<String> jpaExecutor;
+        private int executedTimes;
+        private Long minElapsedTime = null;
+        private Long maxElapsedTime = null;
+
+        public TestJPAServiceInput(Configuration conf, JPAExecutor<String> jpaExecutor, int executedTimes) {
+            this.conf = conf;
+            this.jpaExecutor = jpaExecutor;
+            this.executedTimes = executedTimes;
+        }
+
+        public TestJPAServiceInput(Configuration conf, JPAExecutor<String> jpaExecutor, int executedTimes, long minElapsedTime,
+                                   long maxElapsedTime) {
+            this.conf = conf;
+            this.jpaExecutor = jpaExecutor;
+            this.executedTimes = executedTimes;
+            this.minElapsedTime = minElapsedTime;
+            this.maxElapsedTime = maxElapsedTime;
+        }
+    }
+
+    /*
+    * Tests the mechanism of the JPA service.
+    */
+    @Test
+    public void testJPAService() {
+        JPAService mockedJPAService = null;
+        try {
+            // adding the additional required configuration properties
+            setAdditionalRequiredConfProps(testJPAServiceInput.conf);
+            mockedJPAService = createMockedJPAService(testJPAServiceInput.conf);
+            MyJPAExecutor myJPAExecutor = (MyJPAExecutor) Mockito.spy(testJPAServiceInput.jpaExecutor);
+
+            String ret = null;
+            try {
+                 ret = mockedJPAService.execute(myJPAExecutor);
+            } catch (Exception e) {
+                // no-op
+            }
+
+            Mockito.verify(myJPAExecutor, Mockito.times(testJPAServiceInput.executedTimes)).execute(Mockito.any());
+
+            if (testJPAServiceInput.jpaExecutor instanceof MyPassingJPAExecutor) {
+                assertEquals("The return value should have been `ret`", "ret", ret);
+                // if the JPA executor is passing then there is no point to check the retry mechanism, we're done here
+                return;
+            }
+
+            // saving the elapsed time between the first and last try
+            long elapsedTimeBetweenFirstAndLastExecution =
+                    myJPAExecutor.dates.get(testJPAServiceInput.executedTimes - 1).getTime()
+                            - myJPAExecutor.dates.get(0).getTime();
+
+            assertTrue("The elapsed time between the first and last execution should took minimum ["
+                            + testJPAServiceInput.minElapsedTime + "ms] but it took instead ["
+                            + elapsedTimeBetweenFirstAndLastExecution + "ms].",
+                    elapsedTimeBetweenFirstAndLastExecution > testJPAServiceInput.minElapsedTime);
+
+            assertTrue("The elapsed time between the first and last execution should took maximum ["
+                            + testJPAServiceInput.maxElapsedTime + "ms] but it took instead ["
+                            + elapsedTimeBetweenFirstAndLastExecution + "ms].",
+                    elapsedTimeBetweenFirstAndLastExecution < testJPAServiceInput.maxElapsedTime);
+
+        } catch (Exception e) {
+           throw new RuntimeException(e);
+        } finally {
+            if (mockedJPAService != null) {
+                mockedJPAService.destroy();
+            }
+            closeStaticMocks();
+        }
+    }
+
+    private static Configuration createConfiguration(Integer initialWaitTime, Integer maxWaitTime, Integer maxWaitTimeDeprecated,
+                                                     Integer maxRetryCount) {
+        Configuration conf = new Configuration();
+        setIntToConfIfNotNull(conf, INITIAL_WAIT_TIME, initialWaitTime);
+        setIntToConfIfNotNull(conf, MAX_WAIT_TIME, maxWaitTime);
+        setIntToConfIfNotNull(conf, MAX_WAIT_TIME_DEPRECATED, maxWaitTimeDeprecated);
+        setIntToConfIfNotNull(conf, MAX_RETRY_COUNT, maxRetryCount);
+        return conf;
+    }
+
+    private static void setIntToConfIfNotNull(Configuration conf, String key, Integer value) {
+        if (value != null) {
+            conf.setInt(key, value);
+        }
+    }
+
+    private void setAdditionalRequiredConfProps(Configuration conf) {
+        conf.set(CONF_PASSWORD, "foobar");
+        conf.set(CONF_URL, "jdbc:foo:bar");
     }
 
+    private JPAService createMockedJPAService(Configuration conf) throws ServiceException {
+        Services mockServices = Mockito.mock(Services.class);
+        doReturn(conf).when(mockServices).getConf();
+
+        IOUTILS = Mockito.mockStatic(IOUtils.class);
+        InputStream mockInputStream = Mockito.mock(InputStream.class);
+        IOUTILS.when(() -> IOUtils.getResourceAsStream(Mockito.anyString(), Mockito.anyInt())).thenReturn(mockInputStream);
+
+        SERVICES = Mockito.mockStatic(Services.class);
+        SERVICES.when(Services::get).thenReturn(mockServices);
+
+        PERSISTENCE = Mockito.mockStatic(Persistence.class);
+        OpenJPAEntityTransaction mockEntityTransaction = Mockito.mock(OpenJPAEntityTransaction.class);
+        OpenJPAEntityManagerSPI mockEntityManager = Mockito.mock(OpenJPAEntityManagerSPI.class);
+        OpenJPAEntityManagerFactorySPI mockEntityManagerFactory = Mockito.mock(OpenJPAEntityManagerFactorySPI.class);
+        OpenJPAConfiguration mockOpenJPAConfiguration = Mockito.mock(OpenJPAConfiguration.class);
+        Mockito.when(mockEntityTransaction.isActive()).thenReturn(false);
+        Mockito.when(mockEntityManager.getTransaction()).thenReturn(mockEntityTransaction);
+        Mockito.when(mockEntityManagerFactory.createEntityManager()).thenReturn(mockEntityManager);
+        Mockito.when(mockOpenJPAConfiguration.getConnectionProperties()).thenReturn("foobar");
+        Mockito.when(mockEntityManagerFactory.getConfiguration()).thenReturn(mockOpenJPAConfiguration);
+        PERSISTENCE.when(() -> Persistence.createEntityManagerFactory(Mockito.anyString(),
+                Mockito.any())).thenReturn(mockEntityManagerFactory);
+
+        CODEFACTORY = Mockito.mockStatic(CodecFactory.class);
+
+        JPAService jpaService = new JPAService();
+        jpaService.init(mockServices);
+        return jpaService;
+    }
+
+    private void closeStaticMocks() {
+        if (SERVICES != null) {
+            try {
+                SERVICES.close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                SERVICES = null;
+            }
+        }
+
+        if (IOUTILS != null) {
+            try {
+                IOUTILS.close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                IOUTILS = null;
+            }
+        }
+
+        if (PERSISTENCE != null) {
+            try {
+                PERSISTENCE.close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                PERSISTENCE = null;
+            }
+        }
+
+        if (CODEFACTORY != null) {
+            try {
+                CODEFACTORY.close();
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                CODEFACTORY = null;
+            }
+        }
+    }
 }
diff --git a/release-log.txt b/release-log.txt
index 372837af3..a26c622ad 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3679 Correct maximum wait time between database retry attempts property (jmakai via dionusos)
 OOZIE-3606 Extend file system EL functions to use custom file system properties (jmakai via dionusos)
 OOZIE-3677 Oozie should accept a keyStoreType and trustStoreType property in oozie-site.xml (jmakai via dionusos)
 OOZIE-3678 Reduce the number of NameNode access when starting the Yarn job (jmakai via dionusos)