You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/08 01:26:02 UTC
samza git commit: SAMZA-1486;
Checkpoint manager implementation with Azure Table
Repository: samza
Updated Branches:
refs/heads/master 05113c339 -> adfc4bfc4
SAMZA-1486; Checkpoint manager implementation with Azure Table
vjagadish1989
Author: Daniel Chen <29...@users.noreply.github.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #341 from dxichen/azure-checkpoint-manager
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/adfc4bfc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/adfc4bfc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/adfc4bfc
Branch: refs/heads/master
Commit: adfc4bfc40a5d92b3cc7e7813dd1a73c0c486e0e
Parents: 05113c3
Author: Daniel Chen <29...@users.noreply.github.com>
Authored: Tue Nov 7 17:25:58 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 7 17:25:58 2017 -0800
----------------------------------------------------------------------
.../samza/checkpoint/CheckpointManager.java | 6 +-
.../azure/AzureCheckpointManager.java | 236 +++++++++++++++++++
.../azure/AzureCheckpointManagerFactory.java | 33 +++
.../checkpoint/azure/TaskCheckpointEntity.java | 43 ++++
.../org/apache/samza/config/AzureConfig.java | 2 +-
.../coordinator/AzureCoordinationUtils.java | 2 +-
.../samza/coordinator/AzureJobCoordinator.java | 2 +-
.../azure/ITestAzureCheckpointManager.java | 181 ++++++++++++++
8 files changed, 501 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 10f166c..bc75351 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -44,10 +44,14 @@ public interface CheckpointManager {
/**
* Returns the last recorded checkpoint for a specified taskName.
* @param taskName Specific Samza taskName for which to get the last checkpoint of.
- * @return A Checkpoint object with the recorded offset data of the specified partition.
+ * @return A Checkpoint object with the recorded offset data of the specified partition
+ * or null if there is no recorded checkpoints for the task.
*/
Checkpoint readLastCheckpoint(TaskName taskName);
+ /**
+ * Perform teardown operations for the Manager. Checkpoints are still persisted.
+ */
void stop();
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
new file mode 100644
index 0000000..df3e490
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
@@ -0,0 +1,236 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import com.google.common.collect.ImmutableMap;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.table.*;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.system.SystemStreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * Azure checkpoint manager is used to store checkpoints in a Azure Table.
+ * All the task checkpoints are added to the a single table named "SamzaTaskCheckpoints".
+ * The table entities take the following form:
+ *
+ * +-----------------+---------------------+-------------------+
+ * | | Serialized | |
+ * | TaskName | JSON SSP | Offset |
+ * | | | |
+ * +-----------------+---------------------+-------------------+
+ *
+ * Each entity have a partitionKey set as the TaskName and the rowKey set as the SSP.
+ */
+public class AzureCheckpointManager implements CheckpointManager {
+ private static final Logger LOG = LoggerFactory.getLogger(AzureCheckpointManager.class.getName());
+ private static final String PARTITION_KEY = "PartitionKey";
+
+ public static final int MAX_WRITE_BATCH_SIZE = 100;
+ public static final String CHECKPOINT_MANAGER_TABLE_NAME = "SamzaTaskCheckpoints";
+ public static final String SYSTEM_PROP_NAME = "system";
+ public static final String STREAM_PROP_NAME = "stream";
+ public static final String PARTITION_PROP_NAME = "partition";
+
+ private final String storageConnectionString;
+ private final AzureClient azureClient;
+ private CloudTable cloudTable;
+
+ private final Set<TaskName> taskNames = new HashSet<>();
+ private final JsonSerdeV2<Map<String, String>> jsonSerde = new JsonSerdeV2<>();
+
+ AzureCheckpointManager(AzureConfig azureConfig) {
+ storageConnectionString = azureConfig.getAzureConnectionString();
+ azureClient = new AzureClient(storageConnectionString);
+ }
+
+ @Override
+ public void start() {
+ try {
+ // Create the table if it doesn't exist.
+ cloudTable = azureClient.getTableClient().getTableReference(CHECKPOINT_MANAGER_TABLE_NAME);
+ cloudTable.createIfNotExists();
+
+ } catch (URISyntaxException e) {
+ LOG.error("Connection string {} specifies an invalid URI while creating checkpoint table.",
+ storageConnectionString);
+ throw new AzureException(e);
+
+ } catch (StorageException e) {
+ LOG.error("Azure Storage failed when creating checkpoint table", e);
+ throw new AzureException(e);
+ }
+ }
+
+ @Override
+ public void register(TaskName taskName) {
+ taskNames.add(taskName);
+ }
+
+ @Override
+ public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
+ if (!taskNames.contains(taskName)) {
+ throw new SamzaException("writing checkpoint of unregistered task");
+ }
+
+ TableBatchOperation batchOperation = new TableBatchOperation();
+
+ Iterator<Map.Entry<SystemStreamPartition, String>> iterator = checkpoint.getOffsets().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<SystemStreamPartition, String> entry = iterator.next();
+ SystemStreamPartition ssp = entry.getKey();
+ String offset = entry.getValue();
+
+ // Create table entity
+ TaskCheckpointEntity taskCheckpoint = new TaskCheckpointEntity(taskName.toString(),
+ serializeSystemStreamPartition(ssp), offset);
+
+ // Add to batch operation
+ batchOperation.insertOrReplace(taskCheckpoint);
+
+ // Execute when batch reaches capacity or this is the last item
+ if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || !iterator.hasNext()) {
+ try {
+ cloudTable.execute(batchOperation);
+ } catch (StorageException e) {
+ LOG.error("Executing batch failed for task: {}", taskName);
+ throw new AzureException(e);
+ }
+ batchOperation.clear();
+ }
+ }
+ }
+
+ private String serializeSystemStreamPartition(SystemStreamPartition ssp) {
+ // Create the Json string for SystemStreamPartition
+ Map<String, String> sspMap = new HashMap<>();
+
+ sspMap.put(SYSTEM_PROP_NAME, ssp.getSystem());
+ sspMap.put(STREAM_PROP_NAME, ssp.getStream());
+ sspMap.put(PARTITION_PROP_NAME, String.valueOf(ssp.getPartition().getPartitionId()));
+
+ return new String(jsonSerde.toBytes(sspMap));
+ }
+
+ private SystemStreamPartition deserializeSystemStreamPartition(String serializedSSP) {
+ Map<String, String> sspPropertiesMap = jsonSerde.fromBytes(serializedSSP.getBytes());
+
+ String systemName = sspPropertiesMap.get(SYSTEM_PROP_NAME);
+ String streamName = sspPropertiesMap.get(STREAM_PROP_NAME);
+ Partition partition = new Partition(Integer.parseInt(sspPropertiesMap.get("partition")));
+
+ return new SystemStreamPartition(systemName, streamName, partition);
+ }
+
+ @Override
+ public Checkpoint readLastCheckpoint(TaskName taskName) {
+ if (!taskNames.contains(taskName)) {
+ throw new SamzaException("reading checkpoint of unregistered/unwritten task");
+ }
+
+ // Create the query for taskName
+ String partitionQueryKey = taskName.toString();
+ String partitionFilter = TableQuery.generateFilterCondition(
+ PARTITION_KEY,
+ TableQuery.QueryComparisons.EQUAL,
+ partitionQueryKey);
+ TableQuery<TaskCheckpointEntity> query = TableQuery.from(TaskCheckpointEntity.class).where(partitionFilter);
+
+ ImmutableMap.Builder<SystemStreamPartition, String> builder = ImmutableMap.builder();
+ try {
+ for (TaskCheckpointEntity taskCheckpointEntity : cloudTable.execute(query)) {
+ // Recreate the SSP offset
+ String serializedSSP = taskCheckpointEntity.getRowKey();
+ builder.put(deserializeSystemStreamPartition(serializedSSP), taskCheckpointEntity.getOffset());
+ }
+
+ } catch (NoSuchElementException e) {
+ LOG.warn("No checkpoints found found for registered taskName={}", taskName);
+ // Return null if not entity elements are not found
+ return null;
+ }
+ LOG.debug("Received checkpoint state for taskName=%s", taskName);
+ return new Checkpoint(builder.build());
+ }
+
+ @Override
+ public void stop() {
+ // Nothing to do here
+ }
+
+ @Override
+ public void clearCheckpoints() {
+ LOG.debug("Clearing all checkpoints in Azure table");
+
+ for (TaskName taskName : taskNames) {
+ String partitionQueryKey = taskName.toString();
+
+ // Generate table query
+ String partitionFilter = TableQuery.generateFilterCondition(
+ PARTITION_KEY,
+ TableQuery.QueryComparisons.EQUAL,
+ partitionQueryKey);
+ TableQuery<TaskCheckpointEntity> partitionQuery = TableQuery.from(TaskCheckpointEntity.class)
+ .where(partitionFilter);
+
+ // All entities in a given batch must have the same partition key
+ deleteEntities(cloudTable.execute(partitionQuery).iterator());
+ }
+ }
+
+ private void deleteEntities(Iterator<TaskCheckpointEntity> entitiesToDelete) {
+ TableBatchOperation batchOperation = new TableBatchOperation();
+
+ while (entitiesToDelete.hasNext()) {
+ TaskCheckpointEntity entity = entitiesToDelete.next();
+
+ // Add to batch operation
+ batchOperation.delete(entity);
+
+ // Execute when batch reaches capacity or when this is the last item
+ if (batchOperation.size() >= MAX_WRITE_BATCH_SIZE || !entitiesToDelete.hasNext()) {
+ try {
+ cloudTable.execute(batchOperation);
+ } catch (StorageException e) {
+ LOG.error("Executing batch failed for deleting checkpoints");
+ throw new AzureException(e);
+ }
+ batchOperation.clear();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
new file mode 100644
index 0000000..3c5d62a
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManagerFactory.java
@@ -0,0 +1,33 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointManagerFactory;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class AzureCheckpointManagerFactory implements CheckpointManagerFactory {
+ @Override
+ public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry) {
+ return new AzureCheckpointManager(new AzureConfig(config));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
new file mode 100644
index 0000000..3b673d7
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/checkpoint/azure/TaskCheckpointEntity.java
@@ -0,0 +1,43 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.checkpoint.azure;
+
+import com.microsoft.azure.storage.table.TableServiceEntity;
+
+public class TaskCheckpointEntity extends TableServiceEntity {
+
+ private String offset;
+
+ public TaskCheckpointEntity() {}
+
+ public TaskCheckpointEntity(String taskName, String systemStreamPartition, String offset) {
+ this.partitionKey = taskName;
+ this.rowKey = systemStreamPartition;
+ this.offset = offset;
+ }
+
+ public String getOffset() {
+ return this.offset;
+ }
+
+ public void setOffset(String offset) {
+ this.offset = offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
index dc96d2d..d8002b7 100644
--- a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
@@ -43,7 +43,7 @@ public class AzureConfig extends MapConfig {
tableName = "samzatable" + id;
}
- public String getAzureConnect() {
+ public String getAzureConnectionString() {
if (!containsKey(AZURE_STORAGE_CONNECT)) {
throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!");
}
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index b689f3e..f50ab72 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -32,7 +32,7 @@ public class AzureCoordinationUtils implements CoordinationUtils {
public AzureCoordinationUtils(Config config) {
azureConfig = new AzureConfig(config);
- this.client = new AzureClient(azureConfig.getAzureConnect());
+ this.client = new AzureClient(azureConfig.getAzureConnectionString());
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
index 622932f..468705b 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -103,7 +103,7 @@ public class AzureJobCoordinator implements JobCoordinator {
processorId = createProcessorId(config);
currentJMVersion = new AtomicReference<>(INITIAL_STATE);
AzureConfig azureConfig = new AzureConfig(config);
- client = new AzureClient(azureConfig.getAzureConnect());
+ client = new AzureClient(azureConfig.getAzureConnectionString());
leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
errorHandler = (errorMsg) -> {
LOG.error(errorMsg);
http://git-wip-us.apache.org/repos/asf/samza/blob/adfc4bfc/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
new file mode 100644
index 0000000..3e5ead0
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.azure;
+
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Ignore("Intergration Test")
+public class ITestAzureCheckpointManager {
+
+ private static String storageConnectionString = "";
+ private static CheckpointManager checkpointManager;
+
+ @BeforeClass
+ public static void setupAzureTable() {
+ AzureCheckpointManagerFactory factory = new AzureCheckpointManagerFactory();
+ checkpointManager = factory.getCheckpointManager(getConfig(), new NoOpMetricsRegistry());
+
+ checkpointManager.start();
+ checkpointManager.clearCheckpoints();
+ }
+
+ @AfterClass
+ public static void teardownAzureTable() {
+ checkpointManager.clearCheckpoints();
+ checkpointManager.stop();
+ }
+
+ private static Config getConfig() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(AzureConfig.AZURE_STORAGE_CONNECT, storageConnectionString);
+
+ return new MapConfig(configMap);
+ }
+
+ @Test
+ public void testStoringAndReadingCheckpointsSamePartition() {
+ Partition partition = new Partition(0);
+ TaskName taskName = new TaskName("taskName0");
+ SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+ Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+
+ sspMap.put(ssp, "12345");
+ Checkpoint cp0 = new Checkpoint(sspMap);
+
+ sspMap.put(ssp, "54321");
+ Checkpoint cp1 = new Checkpoint(sspMap);
+
+ checkpointManager.register(taskName);
+
+ checkpointManager.writeCheckpoint(taskName, cp0);
+ Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName);
+ Assert.assertEquals(cp0, readCp);
+
+ checkpointManager.writeCheckpoint(taskName, cp1);
+ Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+ Assert.assertEquals(cp1, readCp1);
+ }
+
+ @Test
+ public void testStoringAndReadingCheckpointsMultiPartitions() {
+ Partition partition = new Partition(0);
+ Partition partition1 = new Partition(1);
+ TaskName taskName = new TaskName("taskName");
+ SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+ SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", partition1);
+
+ Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+ sspMap.put(ssp, "12345");
+ sspMap.put(ssp1, "54321");
+ Checkpoint cp1 = new Checkpoint(sspMap);
+
+ Map<SystemStreamPartition, String> sspMap2 = new HashMap<>();
+ sspMap2.put(ssp, "12347");
+ sspMap2.put(ssp1, "54323");
+ Checkpoint cp2 = new Checkpoint(sspMap2);
+
+ checkpointManager.register(taskName);
+
+ checkpointManager.writeCheckpoint(taskName, cp1);
+ Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+ Assert.assertEquals(cp1, readCp1);
+
+ checkpointManager.writeCheckpoint(taskName, cp2);
+ Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName);
+ Assert.assertEquals(cp2, readCp2);
+ }
+
+ @Test
+ public void testStoringAndReadingCheckpointsMultiTasks() {
+ Partition partition = new Partition(0);
+ Partition partition1 = new Partition(1);
+ TaskName taskName = new TaskName("taskName1");
+ TaskName taskName1 = new TaskName("taskName2");
+ SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+ SystemStreamPartition ssp1 = new SystemStreamPartition("Azure", "Stream", partition1);
+
+ Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+ sspMap.put(ssp, "12345");
+ sspMap.put(ssp1, "54321");
+ Checkpoint cp1 = new Checkpoint(sspMap);
+
+ Map<SystemStreamPartition, String> sspMap2 = new HashMap<>();
+ sspMap2.put(ssp, "12347");
+ sspMap2.put(ssp1, "54323");
+ Checkpoint cp2 = new Checkpoint(sspMap2);
+
+ checkpointManager.register(taskName);
+ checkpointManager.register(taskName1);
+
+ checkpointManager.writeCheckpoint(taskName, cp1);
+ checkpointManager.writeCheckpoint(taskName1, cp2);
+
+ Checkpoint readCp1 = checkpointManager.readLastCheckpoint(taskName);
+ Assert.assertNotNull(readCp1);
+ Assert.assertEquals(cp1, readCp1);
+
+ Checkpoint readCp2 = checkpointManager.readLastCheckpoint(taskName1);
+ Assert.assertNotNull(readCp2);
+ Assert.assertEquals(cp2, readCp2);
+
+ checkpointManager.writeCheckpoint(taskName, cp2);
+ checkpointManager.writeCheckpoint(taskName1, cp1);
+
+ readCp1 = checkpointManager.readLastCheckpoint(taskName1);
+ Assert.assertEquals(cp1, readCp1);
+
+ readCp2 = checkpointManager.readLastCheckpoint(taskName);
+ Assert.assertEquals(cp2, readCp2);
+ }
+
+ @Test
+ public void testMultipleBatchWrites() {
+ TaskName taskName = new TaskName("taskName3");
+ Map<SystemStreamPartition, String> sspMap = new HashMap<>();
+
+ final int testBatchNum = 2;
+ final int testOffsetNum = testBatchNum * AzureCheckpointManager.MAX_WRITE_BATCH_SIZE;
+
+ for (int i = 0; i < testOffsetNum; i++) {
+ Partition partition = new Partition(i);
+ SystemStreamPartition ssp = new SystemStreamPartition("Azure", "Stream", partition);
+ sspMap.put(ssp, String.valueOf(i));
+ }
+
+ Checkpoint cp0 = new Checkpoint(sspMap);
+ checkpointManager.register(taskName);
+ checkpointManager.writeCheckpoint(taskName, cp0);
+ Checkpoint readCp = checkpointManager.readLastCheckpoint(taskName);
+ Assert.assertEquals(cp0, readCp);
+ }
+}