You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2020/03/03 22:22:39 UTC
[samza] branch master updated: SAMZA-2464: Container shuts down
when task fails to remove old state checkpoint dirs (#1283)
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f8bfe87 SAMZA-2464: Container shuts down when task fails to remove old state checkpoint dirs (#1283)
f8bfe87 is described below
commit f8bfe875e9866d350857cf99e5c2b0ddaf7d8ac1
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Tue Mar 3 14:22:29 2020 -0800
SAMZA-2464: Container shuts down when task fails to remove old state checkpoint dirs (#1283)
---
.../src/main/scala/org/apache/samza/container/TaskInstance.scala | 6 +++++-
.../test/scala/org/apache/samza/container/TestTaskInstance.scala | 6 ++----
2 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 2a4f1d6..37aaeff 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -274,7 +274,11 @@ class TaskInstance(
if (storageManager != null) {
trace("Remove old checkpoint stores for taskName: %s" format taskName)
- storageManager.removeOldCheckpoints(checkpointId)
+ try {
+ storageManager.removeOldCheckpoints(checkpointId)
+ } catch {
+ case e: Exception => error("Failed to remove old checkpoints for task: %s. Current checkpointId: %s" format (taskName, checkpointId), e)
+ }
}
if (inputCheckpoint != null) {
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index a54ae72..90f1b58 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -336,7 +336,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
}
@Test
- def testCommitFailsIfErrorClearingOldCheckpoints() { // required for transactional state
+ def testCommitContinuesIfErrorClearingOldCheckpoints() { // required for transactional state
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
@@ -352,10 +352,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
} catch {
case e: SamzaException =>
// exception is expected, container should fail if could not get changelog offsets.
- return
+ fail("Exception from removeOldCheckpoints should have been caught")
}
-
- fail("Should have failed commit if error getting newest changelog offests")
}
/**