You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/06/15 20:40:55 UTC

samza git commit: SAMZA-1335: Improve logging for LocalStoreMonitor

Repository: samza
Updated Branches:
  refs/heads/master d1653aad0 -> e827d150f


SAMZA-1335: Improve logging for LocalStoreMonitor

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #226 from jmakes/samza-1335


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e827d150
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e827d150
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e827d150

Branch: refs/heads/master
Commit: e827d150fc12c81bb64788d28e07488c42a93687
Parents: d1653aa
Author: Jacob Maes <jm...@linkedin.com>
Authored: Thu Jun 15 13:40:43 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Thu Jun 15 13:40:43 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/monitor/LocalStoreMonitor.java | 20 +++++++++++++-------
 .../java/org/apache/samza/rest/model/Task.java  |  5 +++++
 2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e827d150/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 8195491..8b25636 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -83,9 +83,10 @@ public class LocalStoreMonitor implements Monitor {
                              String.format("%s-%s", jobInstance.getJobName(), jobInstance.getJobId()));
       try {
         JobStatus jobStatus = jobsClient.getJobStatus(jobInstance);
+        LOG.info("Job: {} has the status: {}.", jobInstance, jobStatus);
         for (Task task : jobsClient.getTasks(jobInstance)) {
+          LOG.info("Evaluating stores for task: {}", task);
           for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) {
-            LOG.info("Job: {} has the running status: {} with preferred host: {}.", jobInstance, jobStatus, task.getPreferredHost());
             /**
              *  A task store is active if all of the following conditions are true:
              *  a) If the store is amongst the active stores of the task.
@@ -95,9 +96,9 @@ public class LocalStoreMonitor implements Monitor {
             if (jobStatus.hasBeenStarted()
                 && task.getStoreNames().contains(storeName)
                 && task.getPreferredHost().equals(localHostName)) {
-              LOG.info(String.format("Store %s is actively used by the task: %s.", storeName, task.getTaskName()));
+              LOG.info(String.format("Local store: %s is actively used by the task: %s.", storeName, task.getTaskName()));
             } else {
-              LOG.info(String.format("Store %s not used by the task: %s.", storeName, task.getTaskName()));
+              LOG.info(String.format("Local store: %s not used by the task: %s.", storeName, task.getTaskName()));
               markSweepTaskStore(TaskStorageManager.getStorePartitionDir(jobDir, storeName, new TaskName(task.getTaskName())));
             }
           }
@@ -135,7 +136,7 @@ public class LocalStoreMonitor implements Monitor {
    * Role of this method is to garbage collect(mark-sweep) the task store.
    * @param taskStoreDir store directory of the task to perform garbage collection.
    *
-   *  This method cleans up each of the task store directory in two phases.
+   * This method cleans up each of the task store directory in two phases.
    *
    *  Phase 1:
    *  Delete the offset file in the task store if (curTime - lastModifiedTimeOfOffsetFile) > offsetTTL.
@@ -143,8 +144,13 @@ public class LocalStoreMonitor implements Monitor {
    *  Phase 2:
    *  Delete the task store directory if the offsetFile does not exist in task store directory.
    *
-   *  Time interval between the two phases is controlled by this monitor scheduling
-   *  interval in milli seconds.
+   * The separate phases are a safety precaution to prevent deleting a store that is currently being used.
+   * A running task will recreate the deleted offset file on the next commit. If a task is not running or
+   * running on a different host and gets moved to this host, it will not use a store without the offset file.
+   *
+   * Time interval between the two phases is controlled by this monitor scheduling
+   * interval in milli seconds.
+   *
    * @throws IOException if there is an exception during the clean up of the task store files.
    */
   private void markSweepTaskStore(File taskStoreDir) throws IOException {
@@ -158,7 +164,7 @@ public class LocalStoreMonitor implements Monitor {
       localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
     } else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) {
       LOG.info("Deleting the offset file from the store: {}, since the last modified timestamp: {} "
-                   + "of the offset file is older than config file ttl: {}.",
+                   + "is older than the configured ttl: {}.",
                   taskStorePath, offsetFile.lastModified(), config.getOffsetFileTTL());
       offsetFile.delete();
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/e827d150/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
index 94e8370..bb3c46c 100644
--- a/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
+++ b/samza-rest/src/main/java/org/apache/samza/rest/model/Task.java
@@ -134,4 +134,9 @@ public class Task {
     result = 31 * result + storeNames.hashCode();
     return result;
   }
+
+  @Override
+  public String toString() {
+    return String.format("taskName:%s container:%s preferredHost:%s stores:%s", taskName, containerId, preferredHost, storeNames.toString());
+  }
 }