You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/05/01 20:49:25 UTC

samza git commit: SAMZA-1243: Fix flaky tests in TestLocalStoreMonitor

Repository: samza
Updated Branches:
  refs/heads/master 94ff28c56 -> 906aa6b88


SAMZA-1243: Fix flaky tests in TestLocalStoreMonitor

Having attempted to reproduce this issue without success, careful analysis of the code indicates the underlying issue is most likely incomplete cleanup of file system directories after test execution possibly due to incomplete or manual test runs. To address this issue, this fix generates a different local store directory for every test to isolate file system side-effects across different tests and test runs.

Author: Ahmed Abdul Hamid <ah...@ahabdulh-ld1.linkedin.biz>

Reviewers: Shanthoosh Venkataraman <sv...@linkedin.com>

Closes #497 from ahmedahamid/dev/fix-1243


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/906aa6b8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/906aa6b8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/906aa6b8

Branch: refs/heads/master
Commit: 906aa6b88900d8cafe6b7a1fa77c3a569caefc46
Parents: 94ff28c
Author: Ahmed Abdul Hamid <ah...@ahabdulh-ld1.linkedin.biz>
Authored: Tue May 1 13:49:21 2018 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Tue May 1 13:49:21 2018 -0700

----------------------------------------------------------------------
 .../samza/monitor/TestLocalStoreMonitor.java    | 84 ++++++++++----------
 1 file changed, 42 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/906aa6b8/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index 2ad6935..e669ab9 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -20,6 +20,7 @@ package org.apache.samza.monitor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -39,20 +40,20 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertTrue;
+import static junit.framework.TestCase.*;
 
-public class TestLocalStoreMonitor {
 
-  private static File jobDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "samza-test-job/",
-                                        "test-jobName-jobId");
+public class TestLocalStoreMonitor {
 
   private static Logger LOG = LoggerFactory.getLogger(TestLocalStoreMonitor.class);
 
-  private File taskStoreDir = new File(new File(jobDir, "test-store"), "test-task");
+  private File localStoreDir;
+
+  private File jobDir;
 
-  private Map<String, String> config = ImmutableMap.of(LocalStoreMonitorConfig.CONFIG_LOCAL_STORE_DIR,
-      System.getProperty("java.io.tmpdir") + File.separator + "samza-test-job/");
+  private File taskStoreDir;
+
+  private Map<String, String> config;
 
   private LocalStoreMonitor localStoreMonitor;
 
@@ -65,40 +66,45 @@ public class TestLocalStoreMonitor {
 
   @Before
   public void setUp() throws Exception {
+
+    // Create a different local store directory every time the test is executed to avoid
+    // intermittent test failures due to previous unsuccessful test re-runs.
+    localStoreDir = Files.createTempDir();
+    jobDir = new File(localStoreDir, "test-jobName-jobId");
+    taskStoreDir = new File(new File(jobDir, "test-store"), "test-task");
+
+    config = ImmutableMap.of(LocalStoreMonitorConfig.CONFIG_LOCAL_STORE_DIR, localStoreDir.getCanonicalPath());
+
     // Make scaffold directories for testing.
     FileUtils.forceMkdir(taskStoreDir);
     taskStoreSize = taskStoreDir.getTotalSpace();
 
     // Set default return values for methods.
-    Mockito.when(jobsClientMock.getJobStatus(Mockito.any()))
-           .thenReturn(JobStatus.STOPPED);
-    Task task = new Task("localHost", "test-task", "0",
-                         new ArrayList<>(), ImmutableList.of("test-store"));
-    Mockito.when(jobsClientMock.getTasks(Mockito.any()))
-           .thenReturn(ImmutableList.of(task));
+    Mockito.when(jobsClientMock.getJobStatus(Mockito.any())).thenReturn(JobStatus.STOPPED);
+    Task task = new Task("localHost", "test-task", "0", new ArrayList<>(), ImmutableList.of("test-store"));
+    Mockito.when(jobsClientMock.getTasks(Mockito.any())).thenReturn(ImmutableList.of(task));
 
     localStoreMonitorMetrics = new LocalStoreMonitorMetrics("TestMonitorName", new NoOpMetricsRegistry());
 
     // Initialize the local store monitor with mock and config
-    localStoreMonitor = new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(config)),
-                                              localStoreMonitorMetrics,
-                                              jobsClientMock);
+    localStoreMonitor =
+        new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(config)), localStoreMonitorMetrics,
+            jobsClientMock);
   }
 
   @After
-  public void cleanUp()  {
-    // clean up the temp files created
+  public void cleanUp() {
+    // Clean up the entire temp local store directory and all files underneath it.
     try {
-      FileUtils.deleteDirectory(taskStoreDir);
+      FileUtils.deleteDirectory(localStoreDir);
     } catch (IOException e) {
       // Happens when task store can't be deleted after test finishes.
-      LOG.error("Deletion of directory: {} resulted in the exception: {}.", new Object[]{taskStoreDir, e});
+      LOG.error("Deletion of directory: {} resulted in the exception: {}.", new Object[]{localStoreDir, e});
       Assert.fail(e.getMessage());
     }
   }
 
-  // TODO fix in SAMZA-1243
-  // @Test
+  @Test
   public void shouldDeleteLocalTaskStoreWhenItHasNoOffsetFile() throws Exception {
     localStoreMonitor.monitor();
     assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
@@ -107,8 +113,7 @@ public class TestLocalStoreMonitor {
   }
 
   @Test
-  public void shouldDeleteLocalStoreWhenLastModifiedTimeOfOffsetFileIsGreaterThanOffsetTTL()
-      throws Exception {
+  public void shouldDeleteLocalStoreWhenLastModifiedTimeOfOffsetFileIsGreaterThanOffsetTTL() throws Exception {
     File offsetFile = createOffsetFile(taskStoreDir);
     offsetFile.setLastModified(0);
     localStoreMonitor.monitor();
@@ -140,21 +145,17 @@ public class TestLocalStoreMonitor {
 
   @Test
   public void shouldDoNothingWhenTheJobIsRunning() throws Exception {
-    Mockito.when(jobsClientMock.getJobStatus(Mockito.any()))
-           .thenReturn(JobStatus.STARTED);
+    Mockito.when(jobsClientMock.getJobStatus(Mockito.any())).thenReturn(JobStatus.STARTED);
     File offsetFile = createOffsetFile(taskStoreDir);
     localStoreMonitor.monitor();
     assertTrue("Offset file should exist.", offsetFile.exists());
     assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
   }
 
-  // TODO fix in SAMZA-1243
-  // @Test
+  @Test
   public void shouldDeleteTaskStoreWhenTaskPreferredStoreIsNotLocalHost() throws Exception {
-    Task task = new Task("notLocalHost", "test-task", "0",
-                         new ArrayList<>(), ImmutableList.of("test-store"));
-    Mockito.when(jobsClientMock.getTasks(Mockito.any()))
-           .thenReturn(ImmutableList.of(task));
+    Task task = new Task("notLocalHost", "test-task", "0", new ArrayList<>(), ImmutableList.of("test-store"));
+    Mockito.when(jobsClientMock.getTasks(Mockito.any())).thenReturn(ImmutableList.of(task));
     localStoreMonitor.monitor();
     assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
     assertEquals(taskStoreSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
@@ -163,8 +164,7 @@ public class TestLocalStoreMonitor {
 
   @Test
   public void shouldContinueLocalStoreCleanUpAfterFailureToCleanUpStoreOfAJob() throws Exception {
-    File testFailingJobDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "samza-test-job/",
-                                   "test-jobName-jobId-1");
+    File testFailingJobDir = new File(localStoreDir, "test-jobName-jobId-1");
 
     File testFailingTaskStoreDir = new File(new File(testFailingJobDir, "test-store"), "test-task");
 
@@ -173,19 +173,19 @@ public class TestLocalStoreMonitor {
     // For job: test-jobName-jobId-1, throw up in getTasks call and
     // expect the cleanup to succeed for other job: test-jobName-jobId.
     Mockito.doThrow(new RuntimeException("Dummy exception message."))
-           .when(jobsClientMock).getTasks(new JobInstance("test-jobName","jobId-1"));
+        .when(jobsClientMock)
+        .getTasks(new JobInstance("test-jobName", "jobId-1"));
 
-    Task task = new Task("notLocalHost", "test-task", "0",
-                          new ArrayList<>(), ImmutableList.of("test-store"));
+    Task task = new Task("notLocalHost", "test-task", "0", new ArrayList<>(), ImmutableList.of("test-store"));
 
-    Mockito.when(jobsClientMock.getTasks(new JobInstance("test-jobName","jobId")))
-           .thenReturn(ImmutableList.of(task));
+    Mockito.when(jobsClientMock.getTasks(new JobInstance("test-jobName", "jobId"))).thenReturn(ImmutableList.of(task));
 
     Map<String, String> configMap = new HashMap<>(config);
     configMap.put(LocalStoreMonitorConfig.CONFIG_IGNORE_FAILURES, "true");
 
-    LocalStoreMonitor localStoreMonitor = new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(configMap)),
-                                                                localStoreMonitorMetrics, jobsClientMock);
+    LocalStoreMonitor localStoreMonitor =
+        new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(configMap)), localStoreMonitorMetrics,
+            jobsClientMock);
 
     localStoreMonitor.monitor();