You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/17 05:19:41 UTC

samza git commit: SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail

Repository: samza
Updated Branches:
  refs/heads/master 578cc19f6 -> e44b333f1


SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail

Test locally and works.

Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>

Reviewers: Jagadish <ja...@apache.org>

Closes #523 from nickpan47/fix-unittest-deleted-messages


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e44b333f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e44b333f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e44b333f

Branch: refs/heads/master
Commit: e44b333f17ffcc47bc20ac42daca5231aee19272
Parents: 578cc19
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Authored: Wed May 16 22:19:37 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed May 16 22:19:37 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala    | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e44b333f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index c76f6e5..a63db03 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -583,8 +583,6 @@ class KafkaSystemAdmin(
     * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
     */
   override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) {
-    deleteMessagesCalled = true
-
     if (!running) {
       throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName")
     }
@@ -593,6 +591,7 @@ class KafkaSystemAdmin(
         (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
       }.toMap
       adminClient.deleteRecordsBefore(nextOffsets)
+      deleteMessagesCalled = true
     }
   }