You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2019/02/13 15:39:17 UTC
[samza] branch master updated: SAMZA-2102: [samza-azure] code
cleanup and refactoring (#913)
This is an automated email from the ASF dual-hosted git repository.
jmakes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new ad5497a SAMZA-2102: [samza-azure] code cleanup and refactoring (#913)
ad5497a is described below
commit ad5497a6eb53febb8e45c75b8517e76aebfa7e0b
Author: Andrei Paikin <an...@gmail.com>
AuthorDate: Wed Feb 13 18:39:13 2019 +0300
SAMZA-2102: [samza-azure] code cleanup and refactoring (#913)
---
.../org/apache/samza/coordinator/AzureLock.java | 6 ++--
.../scheduler/LeaderLivenessCheckScheduler.java | 37 +++++++---------------
.../system/eventhub/EventHubClientManager.java | 2 +-
.../system/eventhub/admin/EventHubSystemAdmin.java | 23 ++++++--------
.../consumer/EventHubIncomingMessageEnvelope.java | 2 +-
.../eventhub/consumer/EventHubSystemConsumer.java | 22 ++++++-------
.../descriptors/EventHubsInputDescriptor.java | 14 ++++----
.../descriptors/EventHubsOutputDescriptor.java | 12 +++----
.../descriptors/EventHubsSystemDescriptor.java | 2 +-
.../main/java/org/apache/samza/util/BlobUtils.java | 25 ++++++---------
.../azure/ITestAzureCheckpointManager.java | 1 -
11 files changed, 59 insertions(+), 87 deletions(-)
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
index 172a0f3..8cddc4c 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -37,8 +37,8 @@ public class AzureLock implements DistributedLockWithState {
private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
private static final int LEASE_TIME_IN_SEC = 60;
- private AtomicBoolean hasLock;
- private AtomicReference<String> leaseId;
+ private final AtomicBoolean hasLock;
+ private final AtomicReference<String> leaseId;
private final LeaseBlobManager leaseBlobManager;
public AzureLock(BlobUtils blobUtils) {
@@ -97,4 +97,4 @@ public class AzureLock implements DistributedLockWithState {
LOG.info("Unable to unlock.");
}
}
-}
\ No newline at end of file
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
index e0fa448..59a8123 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
@@ -84,32 +84,19 @@ public class LeaderLivenessCheckScheduler implements TaskScheduler {
String currJMV = currentJMVersion.get();
String blobJMV = blob.getJobModelVersion();
//Get the leader processor row from the table.
- Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(currJMV);
- ProcessorEntity leader = null, nextLeader = null;
- for (ProcessorEntity entity: tableList) {
- if (entity.getIsLeader()) {
- leader = entity;
- break;
- }
- }
- int currJMVInt = 0;
- if (!currJMV.equals(initialState)) {
- currJMVInt = Integer.valueOf(currJMV);
- }
- if (Integer.valueOf(blobJMV) > currJMVInt) {
- for (ProcessorEntity entity : table.getEntitiesWithPartition(blobJMV)) {
- if (entity.getIsLeader()) {
- nextLeader = entity;
- break;
- }
- }
- }
+ ProcessorEntity leader = getLeader(currJMV);
+ int currJMVInt = currJMV.equals(initialState) ? 0 : Integer.valueOf(currJMV);
+ ProcessorEntity nextLeader = Integer.valueOf(blobJMV) > currJMVInt ? getLeader(blobJMV) : null;
// Check if row hasn't been updated since 30 seconds.
- if ((leader == null || (System.currentTimeMillis() - leader.getTimestamp().getTime() >= (
- LIVENESS_DEBOUNCE_TIME_SEC * 1000))) && nextLeader == null) {
- return false;
+ boolean leaderIsAlive = leader != null && (System.currentTimeMillis() - leader.getTimestamp().getTime() < (LIVENESS_DEBOUNCE_TIME_SEC * 1000));
+ return leaderIsAlive || nextLeader != null;
+ }
+
+ private ProcessorEntity getLeader(String jmv) {
+ for (ProcessorEntity entity: table.getEntitiesWithPartition(jmv)) {
+ if (entity.getIsLeader()) return entity;
}
- return true;
+ return null;
}
@Override
@@ -117,4 +104,4 @@ public class LeaderLivenessCheckScheduler implements TaskScheduler {
LOG.info("Shutting down LeaderLivenessCheckScheduler Scheduler.");
scheduler.shutdownNow();
}
-}
\ No newline at end of file
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
index 0b4f18f..fcd736c 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
@@ -40,7 +40,7 @@ public interface EventHubClientManager {
* denote that the close invocation should block until all the teardown
* operations for the {@link EventHubClient} are completed
*/
- public static int BLOCK_UNTIL_CLOSE = -1;
+ int BLOCK_UNTIL_CLOSE = -1;
/**
* Lifecycle hook to perform initializations for the creation of
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 27abe07..44353d2 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeoutException;
public class EventHubSystemAdmin implements SystemAdmin {
private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemAdmin.class);
- private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+ private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1).toMillis();
private final EventHubClientManagerFactory eventHubClientManagerFactory;
private final String systemName;
@@ -83,21 +83,18 @@ public class EventHubSystemAdmin implements SystemAdmin {
}
// PartitionRuntimeInformation does not implement toString()
- private String printPartitionRuntimeInfo(PartitionRuntimeInformation runtimeInformation) {
- if (runtimeInformation == null) {
+ private String printPartitionRuntimeInfo(PartitionRuntimeInformation runtimeInfo) {
+ if (runtimeInfo == null) {
return "[PartitionRuntimeInformation: null]";
}
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("[PartitionRuntimeInformation:");
- stringBuilder.append(" eventHubPath=").append(runtimeInformation.getEventHubPath());
- stringBuilder.append(" partitionId=").append(runtimeInformation.getPartitionId());
- stringBuilder.append(" lastEnqueuedTimeUtc=").append(runtimeInformation.getLastEnqueuedTimeUtc().toString());
- stringBuilder.append(" lastEnqueuedOffset=").append(runtimeInformation.getLastEnqueuedOffset());
// calculate the number of messages in the queue
- stringBuilder.append(" numMessages=")
- .append(runtimeInformation.getLastEnqueuedSequenceNumber() - runtimeInformation.getBeginSequenceNumber());
- stringBuilder.append("]");
- return stringBuilder.toString();
+ return "[PartitionRuntimeInformation:"
+ + " eventHubPath=" + runtimeInfo.getEventHubPath()
+ + " partitionId=" + runtimeInfo.getPartitionId()
+ + " lastEnqueuedTimeUtc=" + runtimeInfo.getLastEnqueuedTimeUtc()
+ + " lastEnqueuedOffset=" + runtimeInfo.getLastEnqueuedOffset()
+ + " numMessages=" + (runtimeInfo.getLastEnqueuedSequenceNumber() - runtimeInfo.getBeginSequenceNumber())
+ + "]";
}
@Override
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
index 8aa7480..bcd9c2d 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
@@ -27,7 +27,7 @@ import org.apache.samza.system.SystemStreamPartition;
* Extension of {@link IncomingMessageEnvelope} which contains {@link EventData} system and user properties metadata
*/
public class EventHubIncomingMessageEnvelope extends IncomingMessageEnvelope {
- private EventData eventData;
+ private final EventData eventData;
public EventHubIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
Object message, EventData eventData) {
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index df98d5b..887b3fe 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -104,7 +104,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class);
// Overall timeout for EventHubClient exponential backoff policy
- private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
+ private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10);
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofSeconds(15).toMillis();
public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1
@@ -394,28 +394,24 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
private synchronized void shutdownEventHubsManagers() {
// There could be potentially many Receivers and EventHubManagers, so close the managers in parallel
LOG.info("Start shutting down eventhubs receivers");
- ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver -> new Runnable() {
- @Override
- public void run() {
+ ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver ->
+ (Runnable) () -> {
try {
receiver.close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.error("Failed to shutdown receiver.", e);
}
- }
- }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+ }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
LOG.info("Start shutting down eventhubs managers");
- ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager -> new Runnable() {
- @Override
- public void run() {
+ ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager ->
+ (Runnable) () -> {
try {
manager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
} catch (Exception e) {
LOG.error("Failed to shutdown eventhubs manager.", e);
}
- }
- }).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+ }).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
perPartitionEventHubManagers.clear();
perStreamEventHubManagers.clear();
@@ -447,7 +443,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
private final Counter errorRate;
private final Interceptor interceptor;
private final Integer maxEventCount;
- SystemStreamPartition ssp;
+ private final SystemStreamPartition ssp;
PartitionReceiverHandlerImpl(SystemStreamPartition ssp, Counter eventReadRate, Counter eventByteReadRate,
SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor, int maxEventCount) {
@@ -521,7 +517,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
throwable);
try {
// Add a fixed delay so that we don't keep retrying when there are long-lasting failures
- Thread.sleep(Duration.ofSeconds(2).toMillis());
+ TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
LOG.warn("Interrupted during sleep before renew", e);
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
index c8cc36b..de4e275 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -43,8 +43,8 @@ import org.apache.samza.system.eventhub.EventHubConfig;
*/
public class EventHubsInputDescriptor<StreamMessageType>
extends InputDescriptor<StreamMessageType, EventHubsInputDescriptor<StreamMessageType>> {
- private String namespace;
- private String entityPath;
+ private final String namespace;
+ private final String entityPath;
private Optional<String> sasKeyName = Optional.empty();
private Optional<String> sasToken = Optional.empty();
private Optional<String> consumerGroup = Optional.empty();
@@ -76,7 +76,7 @@ public class EventHubsInputDescriptor<StreamMessageType>
* @return this input descriptor
*/
public EventHubsInputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
- this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+ this.sasKeyName = Optional.ofNullable(StringUtils.stripToNull(sasKeyName));
return this;
}
@@ -87,7 +87,7 @@ public class EventHubsInputDescriptor<StreamMessageType>
* @return this input descriptor
*/
public EventHubsInputDescriptor<StreamMessageType> withSasKey(String sasToken) {
- this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+ this.sasToken = Optional.ofNullable(StringUtils.stripToNull(sasToken));
return this;
}
@@ -99,13 +99,13 @@ public class EventHubsInputDescriptor<StreamMessageType>
* @return this input descriptor
*/
public EventHubsInputDescriptor<StreamMessageType> withConsumerGroup(String consumerGroup) {
- this.consumerGroup = Optional.of(StringUtils.stripToNull(consumerGroup));
+ this.consumerGroup = Optional.ofNullable(StringUtils.stripToNull(consumerGroup));
return this;
}
@Override
public Map<String, String> toConfig() {
- HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+ Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
String streamId = getStreamId();
@@ -116,7 +116,7 @@ public class EventHubsInputDescriptor<StreamMessageType>
ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
sasToken.ifPresent(key ->
ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
- this.consumerGroup.ifPresent(consumerGroupName ->
+ consumerGroup.ifPresent(consumerGroupName ->
ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId), consumerGroupName));
return ehConfigs;
}
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
index b3e1c59..2b3b469 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -43,8 +43,8 @@ import org.apache.samza.system.eventhub.EventHubConfig;
*/
public class EventHubsOutputDescriptor<StreamMessageType>
extends OutputDescriptor<StreamMessageType, EventHubsOutputDescriptor<StreamMessageType>> {
- private String namespace;
- private String entityPath;
+ private final String namespace;
+ private final String entityPath;
private Optional<String> sasKeyName = Optional.empty();
private Optional<String> sasToken = Optional.empty();
@@ -63,7 +63,7 @@ public class EventHubsOutputDescriptor<StreamMessageType>
this.namespace = StringUtils.stripToNull(namespace);
this.entityPath = StringUtils.stripToNull(entityPath);
if (this.namespace == null || this.entityPath == null) {
- throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+ throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in "
+ "system: {%s}, stream: {%s}", getSystemName(), streamId));
}
}
@@ -75,7 +75,7 @@ public class EventHubsOutputDescriptor<StreamMessageType>
* @return this output descriptor
*/
public EventHubsOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
- this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+ this.sasKeyName = Optional.ofNullable(StringUtils.stripToNull(sasKeyName));
return this;
}
@@ -86,13 +86,13 @@ public class EventHubsOutputDescriptor<StreamMessageType>
* @return this output descriptor
*/
public EventHubsOutputDescriptor<StreamMessageType> withSasKey(String sasToken) {
- this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+ this.sasToken = Optional.ofNullable(StringUtils.stripToNull(sasToken));
return this;
}
@Override
public Map<String, String> toConfig() {
- HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+ Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
String streamId = getStreamId();
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
index 2084018..94dbbbd 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -42,7 +42,7 @@ import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.Partitio
public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
- private List<String> streamIds = new ArrayList<>();
+ private final List<String> streamIds = new ArrayList<>();
private Optional<Integer> fetchRuntimeInfoTimeout = Optional.empty();
private Optional<Integer> numClientThreads = Optional.empty();
private Optional<Integer> consumerReceiveQueueSize = Optional.empty();
diff --git a/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
index 85e4273..2e4a27f 100644
--- a/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
@@ -129,8 +129,7 @@ public class BlobUtils {
LOG.error("Job Model details don't exist on the blob.");
return null;
}
- JobModel jm = jmBundle.getCurrJobModel();
- return jm;
+ return jmBundle.getCurrJobModel();
}
/**
@@ -146,8 +145,7 @@ public class BlobUtils {
LOG.error("Job Model details don't exist on the blob.");
return null;
}
- String jmVersion = jmBundle.getCurrJobModelVersion();
- return jmVersion;
+ return jmBundle.getCurrJobModelVersion();
}
/**
@@ -192,14 +190,12 @@ public class BlobUtils {
LOG.error("Failed to read barrier state from blob.", e);
throw new AzureException(e);
}
- String state;
try {
- state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
+ return SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
} catch (IOException e) {
- LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e);
+ LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for barrier state retrieved from the blob.", e);
throw new SamzaException(e);
}
- return state;
}
/**
@@ -242,14 +238,12 @@ public class BlobUtils {
LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e));
throw new AzureException(e);
}
- List<String> list;
try {
- list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
+ return SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
} catch (IOException e) {
- LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e));
+ LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for live processor list retrieved from the blob", new SamzaException(e));
throw new SamzaException(e);
}
- return list;
}
public CloudBlobClient getBlobClient() {
@@ -273,12 +267,11 @@ public class BlobUtils {
throw new AzureException(e);
}
try {
- JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
- return jmBundle;
+ return SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
} catch (IOException e) {
- LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e);
+ LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for JobModel details retrieved from the blob", e);
throw new SamzaException(e);
}
}
-}
\ No newline at end of file
+}
diff --git a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
index 4560b11..7cf01c0 100644
--- a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
+++ b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
@@ -19,7 +19,6 @@
package org.apache.samza.checkpoint.azure;
-import junit.framework.Assert;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;