You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/17 05:19:41 UTC
samza git commit: SAMZA-1715: Unit test for Kafka admin
deletedMessagesCalled() fail
Repository: samza
Updated Branches:
refs/heads/master 578cc19f6 -> e44b333f1
SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail
Test locally and works.
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Reviewers: Jagadish <ja...@apache.org>
Closes #523 from nickpan47/fix-unittest-deleted-messages
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e44b333f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e44b333f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e44b333f
Branch: refs/heads/master
Commit: e44b333f17ffcc47bc20ac42daca5231aee19272
Parents: 578cc19
Author: Yi Pan (Data Infrastructure) <yi...@yipan-mn1.linkedin.biz>
Authored: Wed May 16 22:19:37 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed May 16 22:19:37 2018 -0700
----------------------------------------------------------------------
.../scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/e44b333f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index c76f6e5..a63db03 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -583,8 +583,6 @@ class KafkaSystemAdmin(
* This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
*/
override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) {
- deleteMessagesCalled = true
-
if (!running) {
throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName")
}
@@ -593,6 +591,7 @@ class KafkaSystemAdmin(
(new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
}.toMap
adminClient.deleteRecordsBefore(nextOffsets)
+ deleteMessagesCalled = true
}
}