You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/08/09 10:06:50 UTC

oozie git commit: OOZIE-3304 Parsing sharelib timestamps is not threadsafe (dionusos, matijhs via andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master e229e4dbb -> 6dedac608


OOZIE-3304 Parsing sharelib timestamps is not threadsafe (dionusos, matijhs via andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6dedac60
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6dedac60
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6dedac60

Branch: refs/heads/master
Commit: 6dedac6082c1cc3c4012a3c279632329a8b30482
Parents: e229e4d
Author: Andras Piros <an...@cloudera.com>
Authored: Thu Aug 9 13:05:54 2018 +0300
Committer: Andras Piros <an...@cloudera.com>
Committed: Thu Aug 9 13:05:54 2018 +0300

----------------------------------------------------------------------
 .../apache/oozie/service/ShareLibService.java   |  18 +-
 .../oozie/service/TestHAShareLibService.java    |   2 +-
 .../oozie/service/TestShareLibService.java      | 166 ++++++++++++++++++-
 release-log.txt                                 |   1 +
 4 files changed, 174 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/main/java/org/apache/oozie/service/ShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
index a901567..b88dab3 100644
--- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java
+++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
@@ -85,8 +85,6 @@ public class ShareLibService implements Service, Instrumentable {
 
     public static final String SHARE_LIB_PREFIX = "lib_";
 
-    public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-
     private Services services;
 
     private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>();
@@ -119,6 +117,14 @@ public class ShareLibService implements Service, Instrumentable {
 
     final long retentionTime = 1000L * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION);
 
+    @VisibleForTesting
+    protected static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() {
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmmss");
+        }
+    };
+
     @Override
     public void init(Services services) throws ServiceException {
         this.services = services;
@@ -519,7 +525,7 @@ public class ShareLibService implements Service, Instrumentable {
                     String time = name.substring(prefix.length());
                     Date d = null;
                     try {
-                        d = dateFormat.parse(time);
+                        d = dt.get().parse(time);
                     }
                     catch (ParseException e) {
                         return false;
@@ -707,7 +713,7 @@ public class ShareLibService implements Service, Instrumentable {
      * @return the launcherlib path
      */
     private Path getLauncherlibPath() {
-        String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
+        String formattedDate = dt.get().format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
         Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX
                 + formattedDate);
         return tmpLauncherLibPath;
@@ -733,11 +739,11 @@ public class ShareLibService implements Service, Instrumentable {
 
         FileStatus[] files = fs.listStatus(rootDir, directoryFilter);
         for (FileStatus file : files) {
-            String name = file.getPath().getName().toString();
+            String name = file.getPath().getName();
             String time = name.substring(prefix.length());
             Date d = null;
             try {
-                d = dateFormat.parse(time);
+                d = dt.get().parse(time);
             }
             catch (ParseException e) {
                 continue;

http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
index 5087505..144d0c5 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
@@ -69,7 +69,7 @@ public class TestHAShareLibService extends ZKXTestCase {
         Date time = new Date(System.currentTimeMillis());
 
         Path basePath = new Path(Services.get().getConf().get(WorkflowAppService.SYSTEM_LIB_PATH));
-        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
         fs.mkdirs(libpath);
 
         Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");

http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
index d244166..95dab5c 100644
--- a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
@@ -24,21 +24,33 @@ import java.io.PrintWriter;
 import java.net.URI;
 import java.net.URLDecoder;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import com.google.common.collect.Lists;
 import com.google.common.io.Files;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context;
@@ -53,9 +65,10 @@ import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
+import org.junit.Assert;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
+import org.mockito.Mockito;
 
 public class TestShareLibService extends XFsTestCase {
     private static final String HDFS_SCHEME_PREFIX = "hdfs";
@@ -210,7 +223,7 @@ public class TestShareLibService extends XFsTestCase {
 
         Path basePath = new Path(getOozieConfig()
                 .get(WorkflowAppService.SYSTEM_LIB_PATH));
-        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
         fs.mkdirs(libpath);
 
         Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
@@ -381,7 +394,7 @@ public class TestShareLibService extends XFsTestCase {
 
         // Use timedstamped directory if available
         Date time = new Date(System.currentTimeMillis());
-        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
         fs.mkdirs(libpath);
 
         Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
@@ -483,7 +496,7 @@ public class TestShareLibService extends XFsTestCase {
         Path basePath = new Path(getOozieConfig()
                 .get(WorkflowAppService.SYSTEM_LIB_PATH));
         Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX
-                + ShareLibService.dateFormat.format(time));
+                + ShareLibService.dt.get().format(time));
         fs.mkdirs(libpath);
 
         Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
@@ -545,7 +558,7 @@ public class TestShareLibService extends XFsTestCase {
         Path basePath = new Path(getOozieConfig()
                 .get(WorkflowAppService.SYSTEM_LIB_PATH));
         Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX
-                + ShareLibService.dateFormat.format(time));
+                + ShareLibService.dt.get().format(time));
         fs.mkdirs(libpath);
         Path ooziePath = new Path(libpath.toString() + Path.SEPARATOR + "oozie");
         fs.mkdirs(ooziePath);
@@ -630,7 +643,7 @@ public class TestShareLibService extends XFsTestCase {
 
         // Use timedstamped directory if available
         Date time = new Date(System.currentTimeMillis());
-        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+        Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
 
         Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
         createDirs(fs, pigPath, new Path(pigPath, "temp"));
@@ -1010,6 +1023,147 @@ public class TestShareLibService extends XFsTestCase {
         }
     }
 
+    @Test
+    public void testParsingALotOfShareLibsParallel() throws ServiceException, IOException {
+        setShipLauncherInOozieConfig();
+        services.init();
+        // destroying, as we dont want the sharelib dirs purge to be scheduled
+        services.get(SchedulerService.class).destroy();
+
+        final List<FileStatus> fileStatuses = new ArrayList<>();
+
+        final Path rootDir = Mockito.mock(Path.class);
+        final FileSystem fs = Mockito.mock(FileSystem.class);
+
+        final int NUMBER_OF_FILESTATUSES = 100;
+
+        for (int i = 0; i < NUMBER_OF_FILESTATUSES; ++i) {
+            createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1);
+        }
+
+        final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]);
+        Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses);
+
+        final ShareLibService shareLibService = services.get(ShareLibService.class);
+        shareLibService.fs = fs;
+
+        runGivenCallableOnThreads(() -> {
+            try {
+                shareLibService.getLatestLibPath(rootDir, "lib_");
+            } catch (final IOException | NumberFormatException e) {
+                log.error(e.getMessage());
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }, 10, 10);
+    }
+
+    @Test
+    public void testDeterminingLatestSharelibPathOn1Thread() throws IOException, ServiceException {
+        testDeterminingLatestSharelibPath(1);
+    }
+
+    @Test
+    public void testDeterminingLatestSharelibPathOn5Threads()  throws IOException, ServiceException {
+        testDeterminingLatestSharelibPath(5);
+    }
+
+    @Test
+    public void testDeterminingLatestSharelibPathOn10Threads() throws IOException, ServiceException {
+        testDeterminingLatestSharelibPath(10);
+    }
+
+    private void testDeterminingLatestSharelibPath(final int numberOfThreads) throws ServiceException, IOException {
+        setShipLauncherInOozieConfig();
+        services.init();
+        // destroying, as we dont want the sharelib dirs purge to be scheduled
+        services.get(SchedulerService.class).destroy();
+
+        final List<FileStatus> fileStatuses = new ArrayList<>();
+        createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1);
+        createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 1, 1);
+        final Path filePath3 = createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 1, 0, 1);
+
+        final Path rootDir = Mockito.mock(Path.class);
+        final FileSystem fs = Mockito.mock(FileSystem.class);
+
+        final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]);
+        Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses);
+
+        final ShareLibService shareLibService = services.get(ShareLibService.class);
+        shareLibService.fs = fs;
+
+        runGivenCallableOnThreads(() -> {
+            try {
+                final Path path = shareLibService.getLatestLibPath(rootDir, "lib_");
+                Assert.assertEquals(filePath3, path);
+            } catch (final IOException | NumberFormatException e) {
+                log.error(e.getMessage());
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }, 100, numberOfThreads);
+    }
+
+    private void runGivenCallableOnThreads(
+            final Callable<Boolean> callable, final int numberOfCallables, final int numberOfThreads) {
+        final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
+        final List<Callable<Boolean>> callableTasks = new ArrayList<>();
+
+        for (int i = 0; i < numberOfCallables; ++i) {
+            callableTasks.add(callable);
+        }
+
+        // Start 10 thread to do parallel time parsing. Issue is experienced with old code.
+        List<Future<Boolean>> futures = new ArrayList<>();
+        try {
+            futures = executor.invokeAll(callableTasks);
+        } catch (final InterruptedException e) {
+            log.error(e.getMessage());
+            Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage());
+        }
+
+        // Shut down the executor service to have all tasks finished, then collect the results
+        awaitTerminationAfterShutdown(executor);
+
+        Assert.assertFalse(futures.isEmpty());
+        for (final Future<Boolean> f : futures) {
+            try {
+                final Boolean result = f.get(5, TimeUnit.SECONDS);
+                Assert.assertTrue("Parsed share lib name shall be a valid timestamp", result);
+            } catch (final InterruptedException | ExecutionException | TimeoutException e) {
+                log.error(e.getMessage());
+                Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage());
+            }
+        }
+    }
+
+    private Path createAndAddMockedFileStatus(final List<FileStatus> fileStatuses, int y, int m, int d, int h, int min, int s) {
+        final String date = new SimpleDateFormat("yyyyMMddHHmmss").format(
+                new Calendar.Builder().setDate(y, m, d).setTimeOfDay(h, min, s).build().getTime());
+        final Path filePath = Mockito.mock(Path.class);
+        final String libName = ShareLibService.SHARE_LIB_PREFIX + date;
+        Mockito.when(filePath.getName()).thenReturn(libName);
+        final FileStatus fileStatus = Mockito.mock(FileStatus.class);
+        Mockito.when(fileStatus.getPath()).thenReturn(filePath);
+        fileStatuses.add(fileStatus);
+        return filePath;
+    }
+
+    private void awaitTerminationAfterShutdown(ExecutorService threadPool) {
+        threadPool.shutdown();
+        try {
+            if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
+                threadPool.shutdownNow();
+            }
+        } catch (InterruptedException ex) {
+            threadPool.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private void setupSharelibConf(final String file, final String tag) throws ServiceException, IOException {
         Properties prop = new Properties();
         prop.put(tag, TEST_HDFS_HOME + SHARELIB_PATH);

http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 310f78a..5708d1e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3304 Parsing sharelib timestamps is not threadsafe (dionusos, matijhs via andras.piros)
 OOZIE-3314 Remove findbugs-filter.xml and convert its contents to annotations (asalamon74 via andras.piros)
 OOZIE-3321 PySpark example fails (daniel.becker via andras.piros)
 OOZIE-3315 DateList example fails (daniel.becker via andras.piros)