You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/08/09 10:06:50 UTC
oozie git commit: OOZIE-3304 Parsing sharelib timestamps is not
threadsafe (dionusos, matijhs via andras.piros)
Repository: oozie
Updated Branches:
refs/heads/master e229e4dbb -> 6dedac608
OOZIE-3304 Parsing sharelib timestamps is not threadsafe (dionusos, matijhs via andras.piros)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6dedac60
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6dedac60
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6dedac60
Branch: refs/heads/master
Commit: 6dedac6082c1cc3c4012a3c279632329a8b30482
Parents: e229e4d
Author: Andras Piros <an...@cloudera.com>
Authored: Thu Aug 9 13:05:54 2018 +0300
Committer: Andras Piros <an...@cloudera.com>
Committed: Thu Aug 9 13:05:54 2018 +0300
----------------------------------------------------------------------
.../apache/oozie/service/ShareLibService.java | 18 +-
.../oozie/service/TestHAShareLibService.java | 2 +-
.../oozie/service/TestShareLibService.java | 166 ++++++++++++++++++-
release-log.txt | 1 +
4 files changed, 174 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/main/java/org/apache/oozie/service/ShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ShareLibService.java b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
index a901567..b88dab3 100644
--- a/core/src/main/java/org/apache/oozie/service/ShareLibService.java
+++ b/core/src/main/java/org/apache/oozie/service/ShareLibService.java
@@ -85,8 +85,6 @@ public class ShareLibService implements Service, Instrumentable {
public static final String SHARE_LIB_PREFIX = "lib_";
- public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-
private Services services;
private Map<String, List<Path>> shareLibMap = new HashMap<String, List<Path>>();
@@ -119,6 +117,14 @@ public class ShareLibService implements Service, Instrumentable {
final long retentionTime = 1000L * 60 * 60 * 24 * ConfigurationService.getInt(LAUNCHERJAR_LIB_RETENTION);
+ @VisibleForTesting
+ protected static final ThreadLocal<SimpleDateFormat> dt = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyyMMddHHmmss");
+ }
+ };
+
@Override
public void init(Services services) throws ServiceException {
this.services = services;
@@ -519,7 +525,7 @@ public class ShareLibService implements Service, Instrumentable {
String time = name.substring(prefix.length());
Date d = null;
try {
- d = dateFormat.parse(time);
+ d = dt.get().parse(time);
}
catch (ParseException e) {
return false;
@@ -707,7 +713,7 @@ public class ShareLibService implements Service, Instrumentable {
* @return the launcherlib path
*/
private Path getLauncherlibPath() {
- String formattedDate = dateFormat.format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
+ String formattedDate = dt.get().format(Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime());
Path tmpLauncherLibPath = new Path(services.get(WorkflowAppService.class).getSystemLibPath(), LAUNCHER_LIB_PREFIX
+ formattedDate);
return tmpLauncherLibPath;
@@ -733,11 +739,11 @@ public class ShareLibService implements Service, Instrumentable {
FileStatus[] files = fs.listStatus(rootDir, directoryFilter);
for (FileStatus file : files) {
- String name = file.getPath().getName().toString();
+ String name = file.getPath().getName();
String time = name.substring(prefix.length());
Date d = null;
try {
- d = dateFormat.parse(time);
+ d = dt.get().parse(time);
}
catch (ParseException e) {
continue;
http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
index 5087505..144d0c5 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHAShareLibService.java
@@ -69,7 +69,7 @@ public class TestHAShareLibService extends ZKXTestCase {
Date time = new Date(System.currentTimeMillis());
Path basePath = new Path(Services.get().getConf().get(WorkflowAppService.SYSTEM_LIB_PATH));
- Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+ Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
index d244166..95dab5c 100644
--- a/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestShareLibService.java
@@ -24,21 +24,33 @@ import java.io.PrintWriter;
import java.net.URI;
import java.net.URLDecoder;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import com.google.common.collect.Lists;
import com.google.common.io.Files;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context;
@@ -53,9 +65,10 @@ import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
+import org.junit.Assert;
import org.junit.Test;
-import com.google.common.collect.Lists;
+import org.mockito.Mockito;
public class TestShareLibService extends XFsTestCase {
private static final String HDFS_SCHEME_PREFIX = "hdfs";
@@ -210,7 +223,7 @@ public class TestShareLibService extends XFsTestCase {
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
- Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+ Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
@@ -381,7 +394,7 @@ public class TestShareLibService extends XFsTestCase {
// Use timedstamped directory if available
Date time = new Date(System.currentTimeMillis());
- Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+ Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
@@ -483,7 +496,7 @@ public class TestShareLibService extends XFsTestCase {
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX
- + ShareLibService.dateFormat.format(time));
+ + ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
@@ -545,7 +558,7 @@ public class TestShareLibService extends XFsTestCase {
Path basePath = new Path(getOozieConfig()
.get(WorkflowAppService.SYSTEM_LIB_PATH));
Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX
- + ShareLibService.dateFormat.format(time));
+ + ShareLibService.dt.get().format(time));
fs.mkdirs(libpath);
Path ooziePath = new Path(libpath.toString() + Path.SEPARATOR + "oozie");
fs.mkdirs(ooziePath);
@@ -630,7 +643,7 @@ public class TestShareLibService extends XFsTestCase {
// Use timedstamped directory if available
Date time = new Date(System.currentTimeMillis());
- Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dateFormat.format(time));
+ Path libpath = new Path(basePath, ShareLibService.SHARE_LIB_PREFIX + ShareLibService.dt.get().format(time));
Path pigPath = new Path(libpath.toString() + Path.SEPARATOR + "pig");
createDirs(fs, pigPath, new Path(pigPath, "temp"));
@@ -1010,6 +1023,147 @@ public class TestShareLibService extends XFsTestCase {
}
}
+ @Test
+ public void testParsingALotOfShareLibsParallel() throws ServiceException, IOException {
+ setShipLauncherInOozieConfig();
+ services.init();
+ // destroying, as we dont want the sharelib dirs purge to be scheduled
+ services.get(SchedulerService.class).destroy();
+
+ final List<FileStatus> fileStatuses = new ArrayList<>();
+
+ final Path rootDir = Mockito.mock(Path.class);
+ final FileSystem fs = Mockito.mock(FileSystem.class);
+
+ final int NUMBER_OF_FILESTATUSES = 100;
+
+ for (int i = 0; i < NUMBER_OF_FILESTATUSES; ++i) {
+ createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1);
+ }
+
+ final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]);
+ Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses);
+
+ final ShareLibService shareLibService = services.get(ShareLibService.class);
+ shareLibService.fs = fs;
+
+ runGivenCallableOnThreads(() -> {
+ try {
+ shareLibService.getLatestLibPath(rootDir, "lib_");
+ } catch (final IOException | NumberFormatException e) {
+ log.error(e.getMessage());
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }, 10, 10);
+ }
+
+ @Test
+ public void testDeterminingLatestSharelibPathOn1Thread() throws IOException, ServiceException {
+ testDeterminingLatestSharelibPath(1);
+ }
+
+ @Test
+ public void testDeterminingLatestSharelibPathOn5Threads() throws IOException, ServiceException {
+ testDeterminingLatestSharelibPath(5);
+ }
+
+ @Test
+ public void testDeterminingLatestSharelibPathOn10Threads() throws IOException, ServiceException {
+ testDeterminingLatestSharelibPath(10);
+ }
+
+ private void testDeterminingLatestSharelibPath(final int numberOfThreads) throws ServiceException, IOException {
+ setShipLauncherInOozieConfig();
+ services.init();
+ // destroying, as we dont want the sharelib dirs purge to be scheduled
+ services.get(SchedulerService.class).destroy();
+
+ final List<FileStatus> fileStatuses = new ArrayList<>();
+ createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 0, 1);
+ createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 0, 1, 1);
+ final Path filePath3 = createAndAddMockedFileStatus(fileStatuses, 2018, 8, 1, 1, 0, 1);
+
+ final Path rootDir = Mockito.mock(Path.class);
+ final FileSystem fs = Mockito.mock(FileSystem.class);
+
+ final FileStatus[] statuses = fileStatuses.toArray(new FileStatus[1]);
+ Mockito.when(fs.listStatus(Mockito.any(Path.class), Mockito.any(PathFilter.class))).thenReturn(statuses);
+
+ final ShareLibService shareLibService = services.get(ShareLibService.class);
+ shareLibService.fs = fs;
+
+ runGivenCallableOnThreads(() -> {
+ try {
+ final Path path = shareLibService.getLatestLibPath(rootDir, "lib_");
+ Assert.assertEquals(filePath3, path);
+ } catch (final IOException | NumberFormatException e) {
+ log.error(e.getMessage());
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }, 100, numberOfThreads);
+ }
+
+ private void runGivenCallableOnThreads(
+ final Callable<Boolean> callable, final int numberOfCallables, final int numberOfThreads) {
+ final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
+ final List<Callable<Boolean>> callableTasks = new ArrayList<>();
+
+ for (int i = 0; i < numberOfCallables; ++i) {
+ callableTasks.add(callable);
+ }
+
+ // Start 10 thread to do parallel time parsing. Issue is experienced with old code.
+ List<Future<Boolean>> futures = new ArrayList<>();
+ try {
+ futures = executor.invokeAll(callableTasks);
+ } catch (final InterruptedException e) {
+ log.error(e.getMessage());
+ Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage());
+ }
+
+ // Shut down the executor service to have all tasks finished, then collect the results
+ awaitTerminationAfterShutdown(executor);
+
+ Assert.assertFalse(futures.isEmpty());
+ for (final Future<Boolean> f : futures) {
+ try {
+ final Boolean result = f.get(5, TimeUnit.SECONDS);
+ Assert.assertTrue("Parsed share lib name shall be a valid timestamp", result);
+ } catch (final InterruptedException | ExecutionException | TimeoutException e) {
+ log.error(e.getMessage());
+ Assert.fail("Determining timestamp of a share lib name is failed with: " + e.getMessage());
+ }
+ }
+ }
+
+ private Path createAndAddMockedFileStatus(final List<FileStatus> fileStatuses, int y, int m, int d, int h, int min, int s) {
+ final String date = new SimpleDateFormat("yyyyMMddHHmmss").format(
+ new Calendar.Builder().setDate(y, m, d).setTimeOfDay(h, min, s).build().getTime());
+ final Path filePath = Mockito.mock(Path.class);
+ final String libName = ShareLibService.SHARE_LIB_PREFIX + date;
+ Mockito.when(filePath.getName()).thenReturn(libName);
+ final FileStatus fileStatus = Mockito.mock(FileStatus.class);
+ Mockito.when(fileStatus.getPath()).thenReturn(filePath);
+ fileStatuses.add(fileStatus);
+ return filePath;
+ }
+
+ private void awaitTerminationAfterShutdown(ExecutorService threadPool) {
+ threadPool.shutdown();
+ try {
+ if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
+ threadPool.shutdownNow();
+ }
+ } catch (InterruptedException ex) {
+ threadPool.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void setupSharelibConf(final String file, final String tag) throws ServiceException, IOException {
Properties prop = new Properties();
prop.put(tag, TEST_HDFS_HOME + SHARELIB_PATH);
http://git-wip-us.apache.org/repos/asf/oozie/blob/6dedac60/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 310f78a..5708d1e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3304 Parsing sharelib timestamps is not threadsafe (dionusos, matijhs via andras.piros)
OOZIE-3314 Remove findbugs-filter.xml and convert its contents to annotations (asalamon74 via andras.piros)
OOZIE-3321 PySpark example fails (daniel.becker via andras.piros)
OOZIE-3315 DateList example fails (daniel.becker via andras.piros)