You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2020/03/03 22:22:39 UTC

[samza] branch master updated: SAMZA-2464: Container shuts down when task fails to remove old state checkpoint dirs (#1283)

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f8bfe87  SAMZA-2464: Container shuts down when task fails to remove old state checkpoint dirs (#1283)
f8bfe87 is described below

commit f8bfe875e9866d350857cf99e5c2b0ddaf7d8ac1
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Tue Mar 3 14:22:29 2020 -0800

    SAMZA-2464: Container shuts down when task fails to remove old state checkpoint dirs (#1283)
---
 .../src/main/scala/org/apache/samza/container/TaskInstance.scala    | 6 +++++-
 .../test/scala/org/apache/samza/container/TestTaskInstance.scala    | 6 ++----
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 2a4f1d6..37aaeff 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -274,7 +274,11 @@ class TaskInstance(
 
     if (storageManager != null) {
       trace("Remove old checkpoint stores for taskName: %s" format taskName)
-      storageManager.removeOldCheckpoints(checkpointId)
+      try {
+        storageManager.removeOldCheckpoints(checkpointId)
+      } catch {
+        case e: Exception => error("Failed to remove old checkpoints for task: %s. Current checkpointId: %s" format (taskName, checkpointId), e)
+      }
     }
 
     if (inputCheckpoint != null) {
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index a54ae72..90f1b58 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -336,7 +336,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
   }
 
   @Test
-  def testCommitFailsIfErrorClearingOldCheckpoints() { // required for transactional state
+  def testCommitContinuesIfErrorClearingOldCheckpoints() { // required for transactional state
     val commitsCounter = mock[Counter]
     when(this.metrics.commits).thenReturn(commitsCounter)
 
@@ -352,10 +352,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     } catch {
       case e: SamzaException =>
         // exception is expected, container should fail if could not get changelog offsets.
-        return
+        fail("Exception from removeOldCheckpoints should have been caught")
     }
-
-    fail("Should have failed commit if error getting newest changelog offests")
   }
 
   /**