You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/19 06:18:21 UTC

samza git commit: SAMZA-1209; Improve error handling in LocalStoreMonitor

Repository: samza
Updated Branches:
  refs/heads/master d50b1791b -> 1f3db9cc6


SAMZA-1209; Improve error handling in LocalStoreMonitor

Changes:

1. Add opt-in configuration to continue garbage collection of local stores
   when there\u2019s a failure in garbage collecting one local store.
2. Handle failures gracefully. In getJobModel, JobModel is expected as return response.
   Incase of failures, returned error-message Map is deserialized to JobModel
   resulting in ClassCastException. Handle 2xx, failures separately. Log error
   messages returned properly in httpGet call.
3. Fix getTasks url format.
4. Minor code cleanup(Switch to using ',' as seperator for job.status.servers configuration
   instead of '.' for ease of use).
5. Fix disabled tests in TestLocalStoreMonitor.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Closes #124 from shanthoosh/fix_config_format_in_localstore_monitor


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1f3db9cc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1f3db9cc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1f3db9cc

Branch: refs/heads/master
Commit: 1f3db9cc602f7bdd779afd7f68039f899bf9d2db
Parents: d50b179
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Apr 18 23:18:13 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Tue Apr 18 23:18:13 2017 -0700

----------------------------------------------------------------------
 .../org/apache/samza/monitor/JobsClient.java    | 44 ++++++++------
 .../apache/samza/monitor/LocalStoreMonitor.java | 48 ++++++++-------
 .../samza/monitor/LocalStoreMonitorConfig.java  | 17 +++++-
 .../samza/rest/resources/ResourceConstants.java |  2 +-
 .../samza/monitor/TestLocalStoreMonitor.java    | 62 +++++++++++++++++---
 5 files changed, 125 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1f3db9cc/samza-rest/src/main/java/org/apache/samza/monitor/JobsClient.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/JobsClient.java b/samza-rest/src/main/java/org/apache/samza/monitor/JobsClient.java
index 1e247f6..8d82600 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/JobsClient.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/JobsClient.java
@@ -20,11 +20,13 @@ package org.apache.samza.monitor;
 
 import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Function;
+
+import org.apache.commons.codec.binary.StringUtils;
 import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.samza.SamzaException;
 import org.apache.samza.rest.model.Job;
@@ -67,8 +69,8 @@ public class JobsClient {
    * @throws SamzaException if there were any problems with the http request.
    */
   public List<Task> getTasks(JobInstance jobInstance) {
-    return retriableHttpGet(baseUrl -> String.format(ResourceConstants.GET_TASKS_URL, baseUrl,
-        jobInstance.getJobName(), jobInstance.getJobId()));
+    return queryJobStatusServers(baseUrl -> String.format(ResourceConstants.GET_TASKS_URL, baseUrl,
+        jobInstance.getJobName(), jobInstance.getJobId()), new TypeReference<List<Task>>(){});
   }
 
   /**
@@ -78,8 +80,8 @@ public class JobsClient {
    * @throws SamzaException if there are any problems with the http request.
    */
   public JobStatus getJobStatus(JobInstance jobInstance) {
-    Job job = retriableHttpGet(baseUrl -> String.format(ResourceConstants.GET_JOBS_URL, baseUrl,
-        jobInstance.getJobName(), jobInstance.getJobId()));
+    Job job = queryJobStatusServers(baseUrl -> String.format(ResourceConstants.GET_JOBS_URL, baseUrl,
+        jobInstance.getJobName(), jobInstance.getJobId()), new TypeReference<Job>(){});
     return job.getStatus();
   }
 
@@ -90,41 +92,47 @@ public class JobsClient {
    * When a job status server is down or returns a error response, it tries to reach out to
    * the next job status server in the sequence, to complete the http get request.
    *
-   * @param urlMapFunction to build the request url, given job status server base url.
+   * @param requestUrlBuilder to build the request url, given job status server base url.
    * @param <T> return type of the http get response.
    * @return the response from any one of the job status server.
    * @throws Exception when all the job status servers are unavailable.
    *
    */
-  private <T> T retriableHttpGet(Function<String, String> urlMapFunction) {
-    Exception fetchException = null;
+  private <T> T queryJobStatusServers(Function<String, String> requestUrlBuilder, TypeReference<T> typeReference) {
+    SamzaException fetchException = null;
     for (String jobStatusServer : jobStatusServers) {
-      String requestUrl = urlMapFunction.apply(jobStatusServer);
+      String requestUrl = requestUrlBuilder.apply(jobStatusServer);
       try {
         ObjectMapper objectMapper = new ObjectMapper();
-        return objectMapper.readValue(httpGet(requestUrl), new TypeReference<T>() {});
+        byte[] response = httpGet(requestUrl);
+        return objectMapper.readValue(response, typeReference);
       } catch (Exception e) {
-        LOG.error(String.format("Exception when fetching tasks from the url : %s", requestUrl), e);
-        fetchException = e;
+        String exceptionMessage = String.format("Exception in http get request from url: %s.", requestUrl);
+        LOG.error(exceptionMessage, e);
+        fetchException = new SamzaException(exceptionMessage, e);
       }
     }
-    throw new SamzaException(String.format("Exception during http get from urls : %s", jobStatusServers),
-        fetchException);
+    throw fetchException;
   }
 
   /**
    * This method initiates http get request on the request url and returns the
    * response returned from the http get.
    * @param requestUrl url on which the http get request has to be performed.
-   * @return the input stream of the http get response.
+   * @return the http get response.
    * @throws IOException if there are problems with the http get request.
    */
-  private InputStream httpGet(String requestUrl) throws IOException {
+  private byte[] httpGet(String requestUrl) throws IOException {
     GetMethod getMethod = new GetMethod(requestUrl);
     try {
       int responseCode = httpClient.executeMethod(getMethod);
-      LOG.debug("Received response code {} for the get request on the url : {}", responseCode, requestUrl);
-      return getMethod.getResponseBodyAsStream();
+      LOG.debug("Received response code: {} for the get request on the url: {}", responseCode, requestUrl);
+      byte[] response = getMethod.getResponseBody();
+      if (responseCode != HttpStatus.SC_OK) {
+        throw new SamzaException(String.format("Received response code: %s for get request on: %s, with message: %s.",
+                                               responseCode, requestUrl, StringUtils.newStringUtf8(response)));
+      }
+      return response;
     } finally {
       getMethod.releaseConnection();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f3db9cc/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 2c4ac95..8195491 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -81,25 +81,33 @@ public class LocalStoreMonitor implements Monitor {
     for (JobInstance jobInstance : getHostAffinityEnabledJobs(localStoreDir)) {
       File jobDir = new File(localStoreDir,
                              String.format("%s-%s", jobInstance.getJobName(), jobInstance.getJobId()));
-      JobStatus jobStatus = jobsClient.getJobStatus(jobInstance);
-      for (Task task : jobsClient.getTasks(jobInstance)) {
-        for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) {
-          LOG.info("Job: {} has the running status: {} with preferred host:  {}", jobInstance, jobStatus, task.getPreferredHost());
-          /**
-           *  A task store is active if all of the following conditions are true:
-           *  a) If the store is amongst the active stores of the task.
-           *  b) If the job has been started.
-           *  c) If the preferred host of the task is the localhost on which the monitor is run.
-           */
-          if (jobStatus.hasBeenStarted()
-              && task.getStoreNames().contains(storeName)
-              && task.getPreferredHost().equals(localHostName)) {
-            LOG.info(String.format("Store %s is actively used by the task: %s.", storeName, task.getTaskName()));
-          } else {
-            LOG.info(String.format("Store %s not used by the task: %s.", storeName, task.getTaskName()));
-            markSweepTaskStore(TaskStorageManager.getStorePartitionDir(jobDir, storeName, new TaskName(task.getTaskName())));
+      try {
+        JobStatus jobStatus = jobsClient.getJobStatus(jobInstance);
+        for (Task task : jobsClient.getTasks(jobInstance)) {
+          for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) {
+            LOG.info("Job: {} has the running status: {} with preferred host: {}.", jobInstance, jobStatus, task.getPreferredHost());
+            /**
+             *  A task store is active if all of the following conditions are true:
+             *  a) If the store is amongst the active stores of the task.
+             *  b) If the job has been started.
+             *  c) If the preferred host of the task is the localhost on which the monitor is run.
+             */
+            if (jobStatus.hasBeenStarted()
+                && task.getStoreNames().contains(storeName)
+                && task.getPreferredHost().equals(localHostName)) {
+              LOG.info(String.format("Store %s is actively used by the task: %s.", storeName, task.getTaskName()));
+            } else {
+              LOG.info(String.format("Store %s not used by the task: %s.", storeName, task.getTaskName()));
+              markSweepTaskStore(TaskStorageManager.getStorePartitionDir(jobDir, storeName, new TaskName(task.getTaskName())));
+            }
           }
         }
+      } catch (Exception ex) {
+        if (!config.getIgnoreFailures()) {
+          throw ex;
+        }
+        LOG.warn("Config: {} turned on, failures will be ignored. Local store cleanup for job: {} resulted in exception: {}.",
+                 new Object[]{LocalStoreMonitorConfig.CONFIG_IGNORE_FAILURES, jobInstance, ex});
       }
     }
   }
@@ -143,14 +151,14 @@ public class LocalStoreMonitor implements Monitor {
     String taskStorePath = taskStoreDir.getAbsolutePath();
     File offsetFile = new File(taskStoreDir, OFFSET_FILE_NAME);
     if (!offsetFile.exists()) {
-      LOG.info("Deleting the task store : {}, since it has no offset file.", taskStorePath);
+      LOG.info("Deleting the task store: {}, since it has no offset file.", taskStorePath);
       long taskStoreSizeInBytes = taskStoreDir.getTotalSpace();
       FileUtils.deleteDirectory(taskStoreDir);
       localStoreMonitorMetrics.diskSpaceFreedInBytes.inc(taskStoreSizeInBytes);
       localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
     } else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) {
-      LOG.info("Deleting the offset file from the store : {}, since the last modified timestamp : {} "
-                   + "of the offset file is older than config file ttl : {}.",
+      LOG.info("Deleting the offset file from the store: {}, since the last modified timestamp: {} "
+                   + "of the offset file is older than config file ttl: {}.",
                   taskStorePath, offsetFile.lastModified(), config.getOffsetFileTTL());
       offsetFile.delete();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f3db9cc/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
index 5e50f7d..8413194 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
@@ -53,6 +53,12 @@ public class LocalStoreMonitorConfig extends MapConfig {
    */
   private static final long DEFAULT_OFFSET_FILE_TTL_MS = 1000 * 60 * 60 * 24 * 7;
 
+  /**
+   * Will make LocalStoreMonitor ignore failures during store clean ups.
+   * By default, this is turned off.
+   */
+  public static final String CONFIG_IGNORE_FAILURES = "ignore.failures";
+
   public LocalStoreMonitorConfig(Config config) {
     super(config);
   }
@@ -80,6 +86,15 @@ public class LocalStoreMonitorConfig extends MapConfig {
    *         on the job status server.
    */
   public List<String> getJobStatusServers() {
-     return Arrays.asList(StringUtils.split(get(CONFIG_JOB_STATUS_SERVERS), '.'));
+     return Arrays.asList(StringUtils.split(get(CONFIG_JOB_STATUS_SERVERS), ','));
+  }
+
+  /**
+   * Determines if failures in store cleanup of individual jobs should be ignored in {@link LocalStoreMonitor}.
+   * @return true, if store cleanup failures should be ignored in {@link LocalStoreMonitor} implementation.
+   *         false, otherwise.
+   */
+  public boolean getIgnoreFailures() {
+    return getBoolean(CONFIG_IGNORE_FAILURES, false);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f3db9cc/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceConstants.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceConstants.java b/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceConstants.java
index 933edde..5658623 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceConstants.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/resources/ResourceConstants.java
@@ -20,7 +20,7 @@ package org.apache.samza.rest.resources;
 
 public class ResourceConstants {
 
-  public static final String GET_TASKS_URL = "http://%s/%s/%s/tasks/";
+  public static final String GET_TASKS_URL = "http://%s/v1/jobs/%s/%s/tasks/";
 
   public static final String GET_JOBS_URL = "http://%s/v1/jobs/%s/%s/";
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1f3db9cc/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index d0b6962..2d1681c 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -21,17 +21,23 @@ package org.apache.samza.monitor;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.model.Task;
+import org.apache.samza.rest.proxy.job.JobInstance;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertTrue;
@@ -41,6 +47,8 @@ public class TestLocalStoreMonitor {
   private static File jobDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "samza-test-job/",
                                         "test-jobName-jobId");
 
+  private static Logger LOG = LoggerFactory.getLogger(TestLocalStoreMonitor.class);
+
   private File taskStoreDir = new File(new File(jobDir, "test-store"), "test-task");
 
   private Map<String, String> config = ImmutableMap.of(LocalStoreMonitorConfig.CONFIG_LOCAL_STORE_DIR,
@@ -78,18 +86,23 @@ public class TestLocalStoreMonitor {
   }
 
   @After
-  public void cleanUp() throws Exception {
+  public void cleanUp()  {
     // clean up the temp files created
-    FileUtils.deleteDirectory(taskStoreDir);
+    try {
+      FileUtils.deleteDirectory(taskStoreDir);
+    } catch (IOException e) {
+      // Happens when task store can't be deleted after test finishes.
+      LOG.error("Deletion of directory: {} resulted in the exception: {}.", new Object[]{taskStoreDir, e});
+      Assert.fail(e.getMessage());
+    }
   }
 
-  // TODO: Fix in SAMZA-1183
-  //@Test
+  @Test
   public void shouldDeleteLocalTaskStoreWhenItHasNoOffsetFile() throws Exception {
     localStoreMonitor.monitor();
     assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
     assertEquals(taskStoreSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
-    assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
+    assertEquals(1, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
   }
 
   @Test
@@ -113,6 +126,7 @@ public class TestLocalStoreMonitor {
     assertTrue("Inactive task store directory should not exist.", !inActiveTaskDir.exists());
     assertEquals(taskStoreSize + inActiveTaskDirSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
     assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
+    FileUtils.deleteDirectory(inActiveStoreDir);
   }
 
   @Test
@@ -133,8 +147,7 @@ public class TestLocalStoreMonitor {
     assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
   }
 
-  // TODO: Fix in SAMZA-1183
-  //@Test
+  @Test
   public void shouldDeleteTaskStoreWhenTaskPreferredStoreIsNotLocalHost() throws Exception {
     Task task = new Task("notLocalHost", "test-task", "0",
                          new ArrayList<>(), ImmutableList.of("test-store"));
@@ -143,7 +156,40 @@ public class TestLocalStoreMonitor {
     localStoreMonitor.monitor();
     assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
     assertEquals(taskStoreSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
-    assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
+    assertEquals(1, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
+  }
+
+  @Test
+  public void shouldContinueLocalStoreCleanUpAfterFailureToCleanUpStoreOfAJob() throws Exception {
+    File testFailingJobDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "samza-test-job/",
+                                   "test-jobName-jobId-1");
+
+    File testFailingTaskStoreDir = new File(new File(testFailingJobDir, "test-store"), "test-task");
+
+    FileUtils.forceMkdir(testFailingTaskStoreDir);
+
+    // For job: test-jobName-jobId-1, throw up in getTasks call and
+    // expect the cleanup to succeed for other job: test-jobName-jobId.
+    Mockito.doThrow(new RuntimeException("Dummy exception message."))
+           .when(jobsClientMock).getTasks(new JobInstance("test-jobName","jobId-1"));
+
+    Task task = new Task("notLocalHost", "test-task", "0",
+                          new ArrayList<>(), ImmutableList.of("test-store"));
+
+    Mockito.when(jobsClientMock.getTasks(new JobInstance("test-jobName","jobId")))
+           .thenReturn(ImmutableList.of(task));
+
+    Map<String, String> configMap = new HashMap<>(config);
+    configMap.put(LocalStoreMonitorConfig.CONFIG_IGNORE_FAILURES, "true");
+
+    LocalStoreMonitor localStoreMonitor = new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(configMap)),
+                                                                localStoreMonitorMetrics, jobsClientMock);
+
+    localStoreMonitor.monitor();
+
+    // Non failing job directory should be cleaned up.
+    assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
+    FileUtils.deleteDirectory(testFailingJobDir);
   }
 
   private static File createOffsetFile(File taskStoreDir) throws Exception {