You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/10/31 20:45:47 UTC
samza git commit: SAMZA-1943 Remove ExtendedSystemAdmin and
deprecated getNewestOffsets method.
Repository: samza
Updated Branches:
refs/heads/master 9f60e96b1 -> 8723c3f79
SAMZA-1943 Remove ExtendedSystemAdmin and deprecated getNewestOffsets method.
Author: Boris S <bo...@apache.org>
Author: Boris S <bs...@linkedin.com>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: Bharath Kumarasubramanian <bk...@linkedin.com>
Closes #782 from sborya/removeExtendedSystemAdmin
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8723c3f7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8723c3f7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8723c3f7
Branch: refs/heads/master
Commit: 8723c3f79f751870ebcece9b5618db009029d691
Parents: 9f60e96
Author: Boris S <bo...@apache.org>
Authored: Wed Oct 31 13:45:28 2018 -0700
Committer: Boris S <bs...@linkedin.com>
Committed: Wed Oct 31 13:45:28 2018 -0700
----------------------------------------------------------------------
.../samza/system/ExtendedSystemAdmin.java | 37 -----------
.../org/apache/samza/system/SystemAdmin.java | 12 ++++
.../apache/samza/system/TestSystemAdmin.java | 2 +-
.../samza/system/StreamMetadataCache.scala | 4 +-
.../apache/samza/system/MockSystemFactory.java | 9 +--
.../samza/checkpoint/TestCheckpointTool.scala | 2 +-
.../samza/system/kafka/KafkaSystemAdmin.java | 39 +-----------
.../kafka_deprecated/KafkaSystemAdmin.scala | 66 +-------------------
.../system/kafka/TestKafkaSystemAdmin.scala | 19 +++---
.../kafka_deprecated/TestKafkaSystemAdmin.scala | 41 ------------
10 files changed, 32 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
deleted file mode 100644
index ba239dc..0000000
--- a/samza-api/src/main/java/org/apache/samza/system/ExtendedSystemAdmin.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Interface extends the more generic SystemAdmin interface
- * TODO: Merge this interface method with SystemAdmin when we upgrade to JDK 1.8
- */
-public interface ExtendedSystemAdmin extends SystemAdmin {
- Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL);
-
- /**
- * Deprecated: Use {@link SystemAdmin#getSSPMetadata}, ideally combined with caching (i.e. SSPMetadataCache).
- * Makes fewer offset requests than getSystemStreamMetadata
- */
- @Deprecated
- String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 16f90e9..6ee7df2 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -144,4 +144,16 @@ public interface SystemAdmin {
}
+ /**
+ * Get partitions counts only. Should be more efficient then getSystemStreamMetadata, but if not implemented
+ * revert to getSystemStreamMetadata.
+ * @param streamNames set of streams to query.
+ * @param cacheTTL cacheTTL to use if caching the values.
+ * @return A map from stream name to SystemStreamMetadata for each stream
+ * requested in the parameter set.
+ */
+ default Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
+ return getSystemStreamMetadata(streamNames);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java b/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java
index 85245e3..a725ce1 100644
--- a/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java
+++ b/samza-api/src/test/java/org/apache/samza/system/TestSystemAdmin.java
@@ -112,5 +112,5 @@ public class TestSystemAdmin {
* Looks like Mockito 1.x does not support using thenCallRealMethod with default methods for interfaces, but it works
* to use this placeholder abstract class.
*/
- private abstract class MySystemAdmin implements ExtendedSystemAdmin { }
+ private abstract class MySystemAdmin implements SystemAdmin { }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index 637858b..edffac7 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -62,8 +62,8 @@ class StreamMetadataCache (
.flatMap {
case (systemName, systemStreams) =>
val systemAdmin = systemAdmins.getSystemAdmin(systemName)
- val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
- systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
+ val streamToMetadata = if (partitionsMetadataOnly) {
+ systemAdmin.getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
} else {
systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream).asJava)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
index a9c57da..e3030ab 100644
--- a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
@@ -111,7 +111,7 @@ public class MockSystemFactory implements SystemFactory {
}
public SystemAdmin getAdmin(String systemName, Config config) {
- return new ExtendedSystemAdmin() {
+ return new SystemAdmin() {
@Override
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
@@ -161,12 +161,7 @@ public class MockSystemFactory implements SystemFactory {
public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
return getSystemStreamMetadata(streamNames);
}
-
- @Override
- public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) {
- return null;
- }
-
+
@Override
public boolean createStream(StreamSpec streamSpec) {
return true;
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index bed013c..4bb7adf 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -82,7 +82,7 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
).asJava)
TestCheckpointTool.checkpointManager = mock[CheckpointManager]
TestCheckpointTool.systemAdmin = mock[SystemAdmin]
- when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo").asJava))
+ when(TestCheckpointTool.systemAdmin.getSystemStreamPartitionCounts(Set("foo").asJava, 0))
.thenReturn(Map("foo" -> metadata).asJava)
when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0))
.thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234").asJava))
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index d2ceafb..16a2e67 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -43,9 +43,9 @@ import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.SystemConfig;
-import org.apache.samza.system.ExtendedSystemAdmin;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ExponentialSleepStrategy;
@@ -64,7 +64,7 @@ import scala.runtime.BoxedUnit;
import static org.apache.samza.config.KafkaConsumerConfig.*;
-public class KafkaSystemAdmin implements ExtendedSystemAdmin {
+public class KafkaSystemAdmin implements SystemAdmin {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemAdmin.class);
// Default exponential sleep strategy values
@@ -354,41 +354,6 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
return result;
}
- @Override
- public String getNewestOffset(SystemStreamPartition ssp, Integer maxRetries) {
- LOG.info("Fetching newest offset for: {}", ssp);
-
- ExponentialSleepStrategy strategy = new ExponentialSleepStrategy(DEFAULT_EXPONENTIAL_SLEEP_BACK_OFF_MULTIPLIER,
- DEFAULT_EXPONENTIAL_SLEEP_INITIAL_DELAY_MS, DEFAULT_EXPONENTIAL_SLEEP_MAX_DELAY_MS);
-
- Function1<ExponentialSleepStrategy.RetryLoop, String> fetchNewestOffset =
- new AbstractFunction1<ExponentialSleepStrategy.RetryLoop, String>() {
- @Override
- public String apply(ExponentialSleepStrategy.RetryLoop loop) {
- String result = fetchNewestOffset(ssp);
- loop.done();
- return result;
- }
- };
-
- String offset = strategy.run(fetchNewestOffset,
- new AbstractFunction2<Exception, ExponentialSleepStrategy.RetryLoop, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Exception exception, ExponentialSleepStrategy.RetryLoop loop) {
- if (loop.sleepCount() < maxRetries) {
- LOG.warn(String.format("Fetching newest offset for: %s threw an exception. Retrying.", ssp), exception);
- } else {
- LOG.error(String.format("Fetching newest offset for: %s threw an exception.", ssp), exception);
- loop.done();
- throw new SamzaException("Exception while trying to get newest offset", exception);
- }
- return null;
- }
- }).get();
-
- return offset;
- }
-
/**
* Convert TopicPartition to SystemStreamPartition
* @param topicPartition the topic partition to be created
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
index e7ff749..f16e507 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemAdmin.scala
@@ -154,7 +154,7 @@ class KafkaSystemAdmin(
/**
* Whether deleteMessages() API can be used
*/
- deleteCommittedMessages: Boolean = false) extends ExtendedSystemAdmin with Logging {
+ deleteCommittedMessages: Boolean = false) extends SystemAdmin with Logging {
import KafkaSystemAdmin._
@@ -303,70 +303,6 @@ class KafkaSystemAdmin(
}
/**
- * Returns the newest offset for the specified SSP.
- * This method is fast and targeted. It minimizes the number of kafka requests.
- * It does not retry indefinitely if there is any failure.
- * It returns null if the topic is empty. To get the offsets for *all*
- * partitions, it would be more efficient to call getSystemStreamMetadata
- */
- override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = {
- debug("Fetching newest offset for: %s" format ssp)
- var offset: String = null
- var metadataTTL = Long.MaxValue // Trust the cache until we get an exception
- var retries = maxRetries
- new ExponentialSleepStrategy().run(
- loop => {
- val metadata = TopicMetadataCache.getTopicMetadata(
- Set(ssp.getStream),
- systemName,
- getTopicMetadata,
- metadataTTL)
- debug("Got metadata for streams: %s" format metadata)
-
- val brokersToTopicPartitions = getTopicsAndPartitionsByBroker(metadata)
- val topicAndPartition = new TopicAndPartition(ssp.getStream, ssp.getPartition.getPartitionId)
- val broker = brokersToTopicPartitions.filter((e) => e._2.contains(topicAndPartition)).head._1
-
- // Get oldest, newest, and upcoming offsets for each topic and partition.
- debug("Fetching offset for %s:%s: %s" format (broker.host, broker.port, topicAndPartition))
- val consumer = new SimpleConsumer(broker.host, broker.port, timeout, bufferSize, clientId)
- try {
- offset = getOffsets(consumer, Set(topicAndPartition), OffsetRequest.LatestTime).head._2
-
- // Kafka's "latest" offset is always last message in stream's offset +
- // 1, so get newest message in stream by subtracting one. this is safe
- // even for key-deduplicated streams, since the last message will
- // never be deduplicated.
- if (offset.toLong <= 0) {
- debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
- offset = null
- } else {
- offset = (offset.toLong - 1).toString
- }
- } finally {
- consumer.close
- }
-
- debug("Got offset %s for %s." format(offset, ssp))
- loop.done
- },
-
- (exception, loop) => {
- if (retries > 0) {
- warn("Exception while trying to get offset for %s: %s. Retrying." format(ssp, exception))
- metadataTTL = 0L // Force metadata refresh
- retries -= 1
- } else {
- warn("Exception while trying to get offset for %s" format(ssp), exception)
- loop.done
- throw exception
- }
- })
-
- offset
- }
-
- /**
* Helper method to use topic metadata cache when fetching metadata, so we
* don't hammer Kafka more than we need to.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 1570363..095a1b0 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -21,6 +21,7 @@
package org.apache.samza.system.kafka
+import com.google.common.collect.ImmutableSet
import kafka.admin.AdminUtils
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
import kafka.integration.KafkaServerTestHarness
@@ -316,23 +317,25 @@ class TestKafkaSystemAdmin {
val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4))
val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13))
- assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3))
- assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+ assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+ assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
// Add a new message to one of the partitions, and verify that it works as expected.
assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString)
- assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3))
- assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+ assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+ assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
// Again
assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString)
- assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3))
- assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+ assertEquals("1", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+ assertNull(systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
// Add a message to both partitions
assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString)
assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString)
- assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
- assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
+ assertEquals("2", systemAdmin.getSSPMetadata(ImmutableSet.of(sspUnderTest)).get(sspUnderTest).getNewestOffset)
+ assertEquals("0", systemAdmin.getSSPMetadata(ImmutableSet.of(otherSsp)).get(otherSsp).getNewestOffset)
+
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/8723c3f7/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
index bf64c03..93dad09 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
@@ -307,45 +307,4 @@ class TestKafkaSystemAdmin {
case e: ExponentialSleepStrategy.CallLimitReached => ()
}
}
-
- @Test
- def testGetNewestOffset {
- createTopic(TOPIC2, 16)
- validateTopic(TOPIC2, 16)
-
- val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4))
- val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13))
-
- assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3))
- assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
-
- // Add a new message to one of the partitions, and verify that it works as expected.
- assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString)
- assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3))
- assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
-
- // Again
- assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString)
- assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3))
- assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
-
- // Add a message to both partitions
- assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString)
- assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString)
- assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
- assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
- }
-
- @Test (expected = classOf[LeaderNotAvailableException])
- def testGetNewestOffsetMaxRetry {
- val expectedRetryCount = 3
- val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
- try {
- systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3)
- } catch {
- case e: Exception =>
- assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount)
- throw e
- }
- }
}