You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/08 01:26:02 UTC

samza git commit: SAMZA-1486; Checkpoint manager implementation with Azure Table

Repository: samza
Updated Branches:
  refs/heads/master 05113c339 -> adfc4bfc4


SAMZA-1486; Checkpoint manager implementation with Azure Table

vjagadish1989

Author: Daniel Chen <29...@users.noreply.github.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #341 from dxichen/azure-checkpoint-manager


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adfc4bfc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adfc4bfc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adfc4bfc

Branch: refs/heads/master
Commit: adfc4bfc40a5d92b3cc7e7813dd1a73c0c486e0e
Parents: 05113c3
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Tue Nov 7 17:25:58 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 7 17:25:58 2017 -0800

----------------------------------------------------------------------
 .../samza/checkpoint/CheckpointManager.java     |   6 +-
 .../azure/AzureCheckpointManager.java           | 236 +++++++++++++++++++
 .../azure/AzureCheckpointManagerFactory.java    |  33 +++
 .../checkpoint/azure/TaskCheckpointEntity.java  |  43 ++++
 .../org/apache/samza/config/AzureConfig.java    |   2 +-
 .../coordinator/AzureCoordinationUtils.java     |   2 +-
 .../samza/coordinator/AzureJobCoordinator.java  |   2 +-
 .../azure/ITestAzureCheckpointManager.java      | 181 ++++++++++++++
 8 files changed, 501 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 10f166c..bc75351 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -44,10 +44,14 @@ public interface CheckpointManager {
   /**
    * Returns the last recorded checkpoint for a specified taskName.
    * @param taskName Specific Samza taskName for which to get the last checkpoint of.
-   * @return A Checkpoint object with the recorded offset data of the specified partition.
+   * @return A Checkpoint object with the recorded offset data of the specified partition
+   *         or null if there is no recorded checkpoints for the task.
    */
   Checkpoint readLastCheckpoint(TaskName taskName);
 
+  /**
+   * Perform teardown operations for the Manager. Checkpoints are still persisted.
+   */
   void stop();
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
new file mode 100644
index 0000000..df3e490
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
@@ -0,0 +1,236 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import com.google.common.collect.ImmutableMap;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.table.*;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Azure checkpoint manager is used to store checkpoints in a Azure Table.
+ * All the task checkpoints are added to the a single table named "SamzaTaskCheckpoints".
+ * The table entities take the following form:
+ *
+ * +-----------------+---------------------+-------------------+
+ * |                 |     Serialized      |                   |
+ * |   TaskName      |     JSON SSP        |     Offset        |
+ * |                 |                     |                   |
+ * +-----------------+---------------------+-------------------+
+ *
+ *  Each entity have a partitionKey set as the TaskName and the rowKey set as the SSP.
+ */
+public class AzureCheckpointManager implements CheckpointManager {
+  private static final Logger LOG = LoggerFactory.getLogger(AzureCheckpointManager.class.getName());
+  private static final String PARTITION_KEY = "PartitionKey";
+
+  public static final int MAX_WRITE_BATCH_SIZE = 100;
+  public static final String CHECKPOINT_MANAGER_TABLE_NAME = "SamzaTaskCheckpoints";
+  public static final String SYSTEM_PROP_NAME = "system";
+  public static final String STREAM_PROP_NAME = "stream";
+  public static final String PARTITION_PROP_NAME = "partition";
+
+  private final String storageConnectionString;
+  private final AzureClient azureClient;
+  private CloudTable cloudTable;
+
+  private final Set<TaskName> taskNames = new HashSet<>();
+  private final JsonSerdeV2<Map<String, String>> jsonSerde = new JsonSerdeV2<>();
+
+  AzureCheckpointManager(AzureConfig azureConfig) {
+    storageConnectionString = azureConfig.getAzureConnectionString();
+    azureClient = new AzureClient(storageConnectionString);
+  }
+
+  @Override
+  public void start() {
+    try {
+      // Create the table if it doesn't exist.
+      cloudTable = azureClient.getTableClient().getTableReference(CHECKPOINT_MANAGER_TABLE_NAME);
+      cloudTable.createIfNotExists();
+
+    } catch (URISyntaxException e) {
+      LOG.error("Connection string {} specifies an invalid URI while creating checkpoint table.",
+              storageConnectionString);
+      throw new AzureException(e);
+
+    } catch (StorageException e) {
+      LOG.error("Azure Storage failed when creating checkpoint table", e);
+      throw new AzureException(e);
+    }
+  }
+
+  @Override
+  public void register(TaskName taskName) {
+    taskNames.add(taskName);
+  }
+
+  @Override
+  public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException("writing checkpoint of unregistered task");
+    }
+
+    TableBatchOperation batchOperation = new TableBatchOperation();
+
+    Iterator<Map.Entry<SystemStreamPartition, String>> iterator = checkpoint.getOffsets().entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<SystemStreamPartition, String> entry = iterator.next();
+      SystemStreamPartition ssp = entry.getKey();
+      String offset = entry.getValue();
+
+      // Create table entity
+      TaskCheckpointEntity taskCheckpoint = new TaskCheckpointEntity(taskName.toString(),
+              serializeSystemStreamPartition(ssp), offset);
+
+      // Add to batch operation
+      batchOperation.insertOrReplace(taskCheckpoint);
+
+      // Execute when batch reaches capacity or this is the last item
+      if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || !iterator.hasNext()) {
+        try {
+          cloudTable.execute(batchOperation);
+        } catch (StorageException e) {
+          LOG.error("Executing batch failed for task: {}", taskName);
+          throw new AzureException(e);
+        }
+        batchOperation.clear();
+      }
+    }
+  }
+
+  private String serializeSystemStreamPartition(SystemStreamPartition ssp) {
+    // Create the Json string for SystemStreamPartition
+    Map<String, String> sspMap = new HashMap<>();
+
+    sspMap.put(SYSTEM_PROP_NAME, ssp.getSystem());
+    sspMap.put(STREAM_PROP_NAME, ssp.getStream());
+    sspMap.put(PARTITION_PROP_NAME, String.valueOf(ssp.getPartition().getPartitionId()));
+
+    return new String(jsonSerde.toBytes(sspMap));
+  }
+
+  private SystemStreamPartition deserializeSystemStreamPartition(String serializedSSP) {
+    Map<String, String> sspPropertiesMap = jsonSerde.fromBytes(serializedSSP.getBytes());
+
+    String systemName = sspPropertiesMap.get(SYSTEM_PROP_NAME);
+    String streamName = sspPropertiesMap.get(STREAM_PROP_NAME);
+    Partition partition = new Partition(Integer.parseInt(sspPropertiesMap.get("partition")));
+
+    return new SystemStreamPartition(systemName, streamName, partition);
+  }
+
+  @Override
+  public Checkpoint readLastCheckpoint(TaskName taskName) {
+    if (!taskNames.contains(taskName)) {
+      throw new SamzaException("reading checkpoint of unregistered/unwritten task");
+    }
+
+    // Create the query for taskName
+    String partitionQueryKey = taskName.toString();
+    String partitionFilter = TableQuery.generateFilterCondition(
+            PARTITION_KEY,
+            TableQuery.QueryComparisons.EQUAL,
+            partitionQueryKey);
+    TableQuery<TaskCheckpointEntity> query = TableQuery.from(TaskCheckpointEntity.class).where(partitionFilter);
+
+    ImmutableMap.Builder<SystemStreamPartition, String> builder = ImmutableMap.builder();
+    try {
+      for (TaskCheckpointEntity taskCheckpointEntity : cloudTable.execute(query)) {
+        // Recreate the SSP offset
+        String serializedSSP = taskCheckpointEntity.getRowKey();
+        builder.put(deserializeSystemStreamPartition(serializedSSP), taskCheckpointEntity.getOffset());
+      }
+
+    } catch (NoSuchElementException e) {
+      LOG.warn("No checkpoints found found for registered taskName={}", taskName);
+      // Return null if not entity elements are not found
+      return null;
+    }
+    LOG.debug("Received checkpoint state for taskName=%s", taskName);
+    return new Checkpoint(builder.build());
+  }
+
+  @Override
+  public void stop() {
+    // Nothing to do here
+  }
+
+  @Override
+  public void clearCheckpoints() {
+    LOG.debug("Clearing all checkpoints in Azure table");
+
+    for (TaskName taskName : taskNames) {
+      String partitionQueryKey = taskName.toString();
+
+      // Generate table query
+      String partitionFilter = TableQuery.generateFilterCondition(
+              PARTITION_KEY,
+              TableQuery.QueryComparisons.EQUAL,
+              partitionQueryKey);
+      TableQuery<TaskCheckpointEntity> partitionQuery = TableQuery.from(TaskCheckpointEntity.class)
+              .where(partitionFilter);
+
+      // All entities in a given batch must have the same partition key
+      deleteEntities(cloudTable.execute(partitionQuery).iterator());
+    }
+  }
+
+  private void deleteEntities(Iterator<TaskCheckpointEntity> entitiesToDelete) {
+    TableBatchOperation batchOperation = new TableBatchOperation();
+
+    while (entitiesToDelete.hasNext()) {
+      TaskCheckpointEntity entity = entitiesToDelete.next();
+
+      // Add to batch operation
+      batchOperation.delete(entity);
+
+      // Execute when batch reaches capacity or when this is the last item
+      if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || !entitiesToDelete.hasNext()) {
+        try {
+          cloudTable.execute(batchOperation);
+        } catch (StorageException e) {
+          LOG.error("Executing batch failed for deleting checkpoints");
+          throw new AzureException(e);
+        }
+        batchOperation.clear();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
new file mode 100644
index 0000000..3c5d62a
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class AzureCheckpointManagerFactory implements CheckpointManagerFactory {
+  @Override
+  public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry) {
+    return new AzureCheckpointManager(new AzureConfig(config));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
new file mode 100644
index 0000000..3b673d7
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import com.microsoft.azure.storage.table.TableServiceEntity;
+
+public class TaskCheckpointEntity extends TableServiceEntity {
+
+  private String offset;
+
+  public TaskCheckpointEntity() {}
+
+  public TaskCheckpointEntity(String taskName, String systemStreamPartition, String offset) {
+    this.partitionKey = taskName;
+    this.rowKey = systemStreamPartition;
+    this.offset = offset;
+  }
+
+  public String getOffset() {
+    return this.offset;
+  }
+
+  public void setOffset(String offset) {
+    this.offset = offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
index dc96d2d..d8002b7 100644
--- a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
@@ -43,7 +43,7 @@ public class AzureConfig extends MapConfig {
     tableName = "samzatable" + id;
   }
 
-  public String getAzureConnect() {
+  public String getAzureConnectionString() {
     if (!containsKey(AZURE_STORAGE_CONNECT)) {
       throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!");
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index b689f3e..f50ab72 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -32,7 +32,7 @@ public class AzureCoordinationUtils implements CoordinationUtils {
 
   public AzureCoordinationUtils(Config config) {
     azureConfig = new AzureConfig(config);
-    this.client = new AzureClient(azureConfig.getAzureConnect());
+    this.client = new AzureClient(azureConfig.getAzureConnectionString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 622932f..468705b 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -103,7 +103,7 @@ public class AzureJobCoordinator implements JobCoordinator {
     processorId = createProcessorId(config);
     currentJMVersion = new AtomicReference<>(INITIAL_STATE);
     AzureConfig azureConfig = new AzureConfig(config);
-    client = new AzureClient(azureConfig.getAzureConnect());
+    client = new AzureClient(azureConfig.getAzureConnectionString());
     leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
     errorHandler = (errorMsg) -> {
       LOG.error(errorMsg);

http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
new file mode 100644
index 0000000..3e5ead0
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.azure;
+
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Ignore("Intergration Test")
+public class ITestAzureCheckpointManager {
+
+  private static String storageConnectionString = "";
+  private static CheckpointManager checkpointManager;
+
+  @BeforeClass
+  public static void setupAzureTable() {
+    AzureCheckpointManagerFactory factory = new AzureCheckpointManagerFactory();
+    checkpointManager = factory.getCheckpointManager(getConfig(), new NoOpMetricsRegistry());
+
+    checkpointManager.start();
+    checkpointManager.clearCheckpoints();
+  }
+
+  @AfterClass
+  public static void teardownAzureTable() {
+    checkpointManager.clearCheckpoints();
+    checkpointManager.stop();
+  }
+
+  private static Config getConfig() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(AzureConfig.AZURE_STORAGE_CONNECT, storageConnectionString);
+
+    return new MapConfig(configMap);
+  }
+
+  @Test
+  public void testStoringAndReadingCheckpointsSamePartition() {
+    Partition partition = new Partition(0);
+    TaskName taskName = new TaskName("taskName0");
+    SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+
+    sspMap.put(ssp, "12345");
+    Checkpoint cp0 = new Checkpoint(sspMap);
+
+    sspMap.put(ssp, "54321");
+    Checkpoint cp1 = new Checkpoint(sspMap);
+
+    checkpointManager.register(taskName);
+
+    checkpointManager.writeCheckpoint(taskName, cp0);
+    Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp0, readCp);
+
+    checkpointManager.writeCheckpoint(taskName, cp1);
+    Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp1, readCp1);
+  }
+
+  @Test
+  public void testStoringAndReadingCheckpointsMultiPartitions() {
+    Partition partition = new Partition(0);
+    Partition partition1 = new Partition(1);
+    TaskName taskName = new TaskName("taskName");
+    SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+    SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", partition1);
+
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+    sspMap.put(ssp, "12345");
+    sspMap.put(ssp1, "54321");
+    Checkpoint cp1 = new Checkpoint(sspMap);
+
+    Map<SystemStreamPartition, String> sspMap2 = new HashMap<>();
+    sspMap2.put(ssp, "12347");
+    sspMap2.put(ssp1, "54323");
+    Checkpoint cp2 = new Checkpoint(sspMap2);
+
+    checkpointManager.register(taskName);
+
+    checkpointManager.writeCheckpoint(taskName, cp1);
+    Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp1, readCp1);
+
+    checkpointManager.writeCheckpoint(taskName, cp2);
+    Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp2, readCp2);
+  }
+
+  @Test
+  public void testStoringAndReadingCheckpointsMultiTasks() {
+    Partition partition = new Partition(0);
+    Partition partition1 = new Partition(1);
+    TaskName taskName = new TaskName("taskName1");
+    TaskName taskName1 = new TaskName("taskName2");
+    SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+    SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", partition1);
+
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+    sspMap.put(ssp, "12345");
+    sspMap.put(ssp1, "54321");
+    Checkpoint cp1 = new Checkpoint(sspMap);
+
+    Map<SystemStreamPartition, String> sspMap2 = new HashMap<>();
+    sspMap2.put(ssp, "12347");
+    sspMap2.put(ssp1, "54323");
+    Checkpoint cp2 = new Checkpoint(sspMap2);
+
+    checkpointManager.register(taskName);
+    checkpointManager.register(taskName1);
+
+    checkpointManager.writeCheckpoint(taskName, cp1);
+    checkpointManager.writeCheckpoint(taskName1, cp2);
+
+    Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertNotNull(readCp1);
+    Assert.assertEquals(cp1, readCp1);
+
+    Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName1);
+    Assert.assertNotNull(readCp2);
+    Assert.assertEquals(cp2, readCp2);
+
+    checkpointManager.writeCheckpoint(taskName, cp2);
+    checkpointManager.writeCheckpoint(taskName1, cp1);
+
+    readCp1 = checkpointManager.readLastCheckpoint(taskName1);
+    Assert.assertEquals(cp1, readCp1);
+
+    readCp2 = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp2, readCp2);
+  }
+
+  @Test
+  public void testMultipleBatchWrites() {
+    TaskName taskName = new TaskName("taskName3");
+    Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+
+    final int testBatchNum = 2;
+    final int testOffsetNum = testBatchNum * AzureCheckpointManager.MAX_WRITE_BATCH_SIZE;
+
+    for (int i = 0; i < testOffsetNum; i++) {
+      Partition partition = new Partition(i);
+      SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+      sspMap.put(ssp, String.valueOf(i));
+    }
+
+    Checkpoint cp0 = new Checkpoint(sspMap);
+    checkpointManager.register(taskName);
+    checkpointManager.writeCheckpoint(taskName, cp0);
+    Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName);
+    Assert.assertEquals(cp0, readCp);
+  }
+}