You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/04/16 21:44:08 UTC

[GitHub] [samza] dxichen opened a new pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

dxichen opened a new pull request #1489:
URL: https://github.com/apache/samza/pull/1489


   Introduce a new checkpoint message format (CheckpointV2) that includes state markers (kafka changelog offset / ambry blob id) in them.
   
   Renamed previous checkpoint messages as CheckpointV1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616993155



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -19,54 +19,27 @@
 
 package org.apache.samza.checkpoint;
 
+import java.util.Map;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
 
+public interface Checkpoint {
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Gets the version number of the Checkpoint
+   * @return Short indicating the version number
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
+  short getVersion();
 
   /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+   * Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s.
+   * The returned value differs based on the Checkpoint version:
+   * <ol>
+   *    <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well
+   *      as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li>
+   *    <li>For {@link CheckpointV2} returns the input offsets only.</li>
+   * </ol>

Review comment:
       I should have been a bit more clear. There are some niche use cases that end up consuming their own state changelogs. so what is the expectation there? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1489:
URL: https://github.com/apache/samza/pull/1489#issuecomment-833224543


   @dxichen We want to merge this with master branch right? Or, were you thinking of doing one final merge with master after merging all 3 PRs with the feature branch.
   
   If it former, can you just open a PR against master fixing the conflicts so that we can merge. If it latter, the travis checks are failing and can you look into them before merging into the feature branch.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616966296



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -36,7 +36,7 @@
   private final long millis;
   private final long nanos;
 
-  public CheckpointId(long millis, long nanos) {
+  private CheckpointId(long millis, long nanos) {
     this.millis = millis;
     this.nanos = nanos;
   }

Review comment:
       For context, the nanos suffix was added as a workaround to distinguish b/w two store checkpoints that may have been created within the same milliseconds (e.g. when using manual commit with task coordinator).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r624024017



##########
File path: samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link CheckpointId} to/from JSON.
+ */
+public abstract class JsonCheckpointIdMixin {

Review comment:
       unused file?

##########
File path: samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
##########
@@ -53,6 +56,18 @@
   // commit period in milliseconds
   public static final String COMMIT_MS = "task.commit.ms";

Review comment:
       Can you create a JIRA and mark this resolved?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616971736



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
     return nanos;
   }
 
+  /**
+   * WARNING: Do not change the toString() representation. It is used for serde'ing {@link CheckpointId} as part of task
+   * checkpoints, in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link CheckpointId}.
+   */

Review comment:
       Agreed, imo keeping it in String format will make it easier to read; will rename it and use the jackson model for here and all other occurences

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -36,7 +36,7 @@
   private final long millis;
   private final long nanos;
 
-  public CheckpointId(long millis, long nanos) {
+  private CheckpointId(long millis, long nanos) {
     this.millis = millis;
     this.nanos = nanos;
   }

Review comment:
       I think in that case, we can directly store `nano % 1e6` as the class param, will make that change.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -19,54 +19,27 @@
 
 package org.apache.samza.checkpoint;
 
+import java.util.Map;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
 
+public interface Checkpoint {
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Gets the version number of the Checkpoint
+   * @return Short indicating the version number
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
+  short getVersion();
 
   /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+   * Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s.
+   * The returned value differs based on the Checkpoint version:
+   * <ol>
+   *    <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well
+   *      as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li>
+   *    <li>For {@link CheckpointV2} returns the input offsets only.</li>
+   * </ol>

Review comment:
       Going forwards, we are going to rely on the former in that we will create checkpointv2 with differentiated state offsets and input offsets. I have changed the references to checkpoint creation in this PR to reflect that property. Furthermore in the impl of CheckpointV2, the separation of input and state offsets are also distinguished

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -122,6 +139,15 @@ public StorageConfig(Config config) {
     return Optional.ofNullable(systemStreamRes);
   }
 
+  public List<String> getStoreBackupManagerClassName(String storeName) {
+    List<String> storeBackupManagers = getList(String.format(STORE_BACKEND_BACKUP_FACTORIES, storeName), new ArrayList<>());
+    // For backwards compatibility if the changelog is enabled, we use default kafka backup factory
+    if (storeBackupManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
+      storeBackupManagers = DEFAULT_STATE_BACKEND_BACKUP_FACTORIES;
+    }

Review comment:
       for the first phase of the migration, we are defaulting to restore via kafka and the changelog will be enabled by default for all the stores with dual commit to both blob store and changelog.

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, offset], separated by a semi-colon.
+ */
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + stateCheckpointMarker);
+    }
+    String[] payload = stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));

Review comment:
       will wrap in IAE

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.

Review comment:
       A version field makes sense here, will add

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+
+public class CheckpointV2 implements Checkpoint {
+  public static final short CHECKPOINT_VERSION = 2;
+
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, Map<String, String>> stateCheckpointMarkers;
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   *
+   * @param checkpointId {@link CheckpointId} associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of state backend factory name to map of local state store names
+   *                         to state checkpoints
+   */
+  public CheckpointV2(CheckpointId checkpointId,
+      Map<SystemStreamPartition, String> inputOffsets,
+      Map<String, Map<String, String>> stateCheckpoints) {
+    this.checkpointId = checkpointId;
+    this.inputOffsets = ImmutableMap.copyOf(inputOffsets);
+    this.stateCheckpointMarkers = ImmutableMap.copyOf(stateCheckpoints);

Review comment:
       Good point, they should not be null; adding invariant checks

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/JsonCheckpoint.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.util.Map;
+
+/**
+ * Used for Json serialization of the {@link org.apache.samza.checkpoint.Checkpoint} class by the
+ * {@link CheckpointV2Serde}
+ * This cannot be an internal class as required by Jackson Object mapper
+ */
+public class JsonCheckpoint {
+  private String checkpointId;
+  private Map<String, Map<String, String>> inputOffsets;
+  // Map<StorageBackendFactoryName, Map<StoreName, StateCheckpointMarker>>

Review comment:
       will change this to the mixin model

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, offset], separated by a semi-colon.
+ */
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + stateCheckpointMarker);
+    }
+    String[] payload = stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));
+    String offset = null;
+    if (!"null".equals(payload[3])) {
+      offset = payload[3];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(system, stream, partition), offset);
+  }
+
+  public SystemStreamPartition getChangelogSSP() {
+    return changelogSSP;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  /**
+   * Builds a map of store changelog SSPs to their offset for Kafka changelog backed stores from the provided
+   * map of state backend factory name to map of store name to serialized state checkpoint markers.
+   *
+   * @param stateBackendToStoreSCMs Map of state backend factory name to map of store name to serialized
+   *                                state checkpoint markers
+   * @return Map of store changelog SSPss to their optional offset, or an empty map if there is no mapping for
+   * {@link #KAFKA_STATE_BACKEND_FACTORY_NAME} in the input map. Optional offset may be empty if the
+   * changelog SSP was empty.
+   */
+  public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
+      Map<String, Map<String, String>> stateBackendToStoreSCMs) {
+    Map<SystemStreamPartition, Option<String>> sspToOffsetOptions = new HashMap<>();
+    if (stateBackendToStoreSCMs.containsKey(KAFKA_STATE_BACKEND_FACTORY_NAME)) {
+      Map<String, String> storeToKafkaSCMs = stateBackendToStoreSCMs.get(KAFKA_STATE_BACKEND_FACTORY_NAME);
+      storeToKafkaSCMs.forEach((key, value) -> {
+        KafkaStateCheckpointMarker stateMarker = KafkaStateCheckpointMarker.fromString(value);
+        Option<String> offsetOption = Option.apply(stateMarker.getChangelogOffset());
+        sspToOffsetOptions.put(new SystemStreamPartition(stateMarker.getChangelogSSP()), offsetOption);
+      });
+    }
+    return sspToOffsetOptions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;

Review comment:
       will fix here and everywhere else




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616976966



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
     return nanos;
   }
 
+  /**
+   * WARNING: Do not change the toString() representation. It is used for serde'ing {@link CheckpointId} as part of task
+   * checkpoints, in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link CheckpointId}.
+   */

Review comment:
       @dxichen 
   > use the jackson model for here and all other occurences
   Is that backwards compatible?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616970406



##########
File path: samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
##########
@@ -53,6 +56,18 @@
   // commit period in milliseconds
   public static final String COMMIT_MS = "task.commit.ms";
   static final long DEFAULT_COMMIT_MS = 60000L;
+  // upper bound for the task commit thread pool size in a container .
+  // num threads == min(num tasks in container, max thread pool size)
+  public static final String COMMIT_MAX_THREAD_POOL_SIZE = "task.commit.max.thread.pool.size";
+  static final int DEFAULT_COMMIT_MAX_THREAD_POOL_SIZE = 64;
+  // maximum amount of time a task may continue processing while a previous commit is pending
+  public static final String COMMIT_MAX_DELAY_MS = "task.commit.max.delay.ms";
+  static final long DEFAULT_COMMIT_MAX_DELAY_MS = Duration.ofMinutes(10).toMillis();

Review comment:
       For context, `task.commit.max.delay.ms` is the amount of time the task may continue processing while the previous commit is still in progress. A longer default value of this timeout is better if using blob store state backend (since compactions may occasionally increase the state size to be uploaded by a large amount), and will not hurt when using Kafka state backend (it will typically finish much earlier).
   
   `task.commit.timeout.ms` determines how long the task blocking-waits for the commit to complete when the previous "grace period" is exceeded. If this value is <= 0 it will throw an exception immediately, else it will wait for the provided interval and then throw an exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r618091339



##########
File path: samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
##########
@@ -53,6 +56,18 @@
   // commit period in milliseconds
   public static final String COMMIT_MS = "task.commit.ms";

Review comment:
       Going to address this in a follow up pr after this feature is fully released




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616981383



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
     return nanos;
   }
 
+  /**
+   * WARNING: Do not change the toString() representation. It is used for serde'ing {@link CheckpointId} as part of task
+   * checkpoints, in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link CheckpointId}.
+   */

Review comment:
       We would have to accept both versions in the fromString/deserialize method to perserve backwards compatibility




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r615911984



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
     return nanos;
   }
 
+  /**
+   * WARNING: Do not change the toString() representation. It is used for serde'ing {@link CheckpointId} as part of task
+   * checkpoints, in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link CheckpointId}.
+   */

Review comment:
       why use `toString()` for serde? A string representation is useful for debugging in general and overloading that to a serde format would either results in a compromise on readability or inefficient serde representation.
   
   Few questions
   1. Why deviate from the serde approach in the code base (using jackson)?
   2. If your intent is to have a readable representation, you could still use jackson and persist it as string representation instead bytes? 
   
   If you still want to leave it as `String`, consider using `to(...) and from(...)` and leave the `toString` or maybe `serialize(...) and deserialize(...)`

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -36,7 +36,7 @@
   private final long millis;
   private final long nanos;
 
-  public CheckpointId(long millis, long nanos) {
+  private CheckpointId(long millis, long nanos) {
     this.millis = millis;
     this.nanos = nanos;
   }

Review comment:
       Why use nanos if the creation path loses the precision (e.g. `System.nanoTime() % 1e6`) in the `create()` code below? 
   It is mostly a question given you have made the constructor private and hence the only usage is within this class and what it means to lose the precision of `nanos` field.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+
+public class CheckpointV2 implements Checkpoint {
+  public static final short CHECKPOINT_VERSION = 2;
+
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, Map<String, String>> stateCheckpointMarkers;
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   *
+   * @param checkpointId {@link CheckpointId} associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of state backend factory name to map of local state store names
+   *                         to state checkpoints
+   */
+  public CheckpointV2(CheckpointId checkpointId,
+      Map<SystemStreamPartition, String> inputOffsets,
+      Map<String, Map<String, String>> stateCheckpoints) {
+    this.checkpointId = checkpointId;
+    this.inputOffsets = ImmutableMap.copyOf(inputOffsets);
+    this.stateCheckpointMarkers = ImmutableMap.copyOf(stateCheckpoints);

Review comment:
       Can any of these be null? Do we need some invariants checks here?

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, offset], separated by a semi-colon.
+ */
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + stateCheckpointMarker);
+    }
+    String[] payload = stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));

Review comment:
       do we want to bubble up `IllegalArgumentException` for the invalid parsing exception too or is this intended to throw `NumberFormatException` to the caller? 

##########
File path: samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
##########
@@ -53,6 +56,18 @@
   // commit period in milliseconds
   public static final String COMMIT_MS = "task.commit.ms";
   static final long DEFAULT_COMMIT_MS = 60000L;
+  // upper bound for the task commit thread pool size in a container .
+  // num threads == min(num tasks in container, max thread pool size)
+  public static final String COMMIT_MAX_THREAD_POOL_SIZE = "task.commit.max.thread.pool.size";
+  static final int DEFAULT_COMMIT_MAX_THREAD_POOL_SIZE = 64;
+  // maximum amount of time a task may continue processing while a previous commit is pending
+  public static final String COMMIT_MAX_DELAY_MS = "task.commit.max.delay.ms";
+  static final long DEFAULT_COMMIT_MAX_DELAY_MS = Duration.ofMinutes(10).toMillis();

Review comment:
       Seems like a bigger window for duplicate processing? Should we consider lower values here in OSS?
   Can you clarify the relation between this configuration and `task.commit.timeout.ms` in the configuration document if not here? Especially `task.commit.timeout.ms` defaulted to 1 minute, i wonder why we need 10 minutes as the default?

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/JsonCheckpoint.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.util.Map;
+
+/**
+ * Used for Json serialization of the {@link org.apache.samza.checkpoint.Checkpoint} class by the
+ * {@link CheckpointV2Serde}
+ * This cannot be an internal class as required by Jackson Object mapper
+ */
+public class JsonCheckpoint {
+  private String checkpointId;
+  private Map<String, Map<String, String>> inputOffsets;
+  // Map<StorageBackendFactoryName, Map<StoreName, StateCheckpointMarker>>

Review comment:
       Should this be in API module? Given the changes to constructor or order of parameters can result in serde errors.
   Also, why is it different than the Mixin pattern used elsewhere? 

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV1.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+public class CheckpointV1 implements Checkpoint {
+  public static final short CHECKPOINT_VERSION = 1;
+
+  private final Map<SystemStreamPartition, String> offsets;
+
+  /**
+   * Constructs a new checkpoint based off a map of Samza stream offsets.
+   *
+   * @param offsets Map of Samza streams to their current offset.
+   */
+  public CheckpointV1(Map<SystemStreamPartition, String> offsets) {
+    this.offsets = offsets;
+  }
+
+  public short getVersion() {
+    return CHECKPOINT_VERSION;
+  }
+
+  /**
+   * Gets a unmodifiable view of the current Samza input stream offsets.
+   *
+   * @return a unmodifiable view of a Map of Samza streams to their recorded offsets.
+   */
+  @Override
+  public Map<SystemStreamPartition, String> getOffsets() {
+    return Collections.unmodifiableMap(offsets);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof CheckpointV1)) return false;
+
+    CheckpointV1 that = (CheckpointV1) o;
+
+    return Objects.equals(offsets, that.offsets);
+  }
+
+  @Override
+  public int hashCode() {
+    return offsets != null ? offsets.hashCode() : 0;
+  }

Review comment:
       use `Objects.hash(...)` to be consistent 

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -19,54 +19,27 @@
 
 package org.apache.samza.checkpoint;
 
+import java.util.Map;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
 
+public interface Checkpoint {
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Gets the version number of the Checkpoint
+   * @return Short indicating the version number
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
+  short getVersion();
 
   /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+   * Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s.
+   * The returned value differs based on the Checkpoint version:
+   * <ol>
+   *    <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well
+   *      as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li>
+   *    <li>For {@link CheckpointV2} returns the input offsets only.</li>
+   * </ol>

Review comment:
       How does `CheckpointV2` enforce only input offsets are stored? More specifically, does it rely on the implicit assumption that the framework developers need to differentiate changelog offsets as non-input offsets or are there checks and balances in implementation of `CheckpointV2`?

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/CheckpointV2Serde.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.system.SystemStreamPartition;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * The {@link Serde} for {@link CheckpointV2} which includes {@link CheckpointId}s and state checkpoint markers
+ * in addition to the input {@link SystemStreamPartition} offsets.
+ *
+ * {@link CheckpointId} is serde'd using {@link CheckpointId#toString()} and {@link CheckpointId#fromString(String)}.
+ *
+ * The overall payload is serde'd as JSON using {@link JsonSerdeV2}. Since the Samza classes cannot be directly
+ * serialized by Jackson, we use {@link JsonCheckpoint} as an intermediate POJO to help with serde.
+ *
+ * The serialized JSON looks as follows:
+ * <pre>
+ * {
+ *   "checkpointId" : "1614147487244-33577",
+ *   "inputOffsets" : {
+ *     "SystemStreamPartition [test-system, test-stream, 777]" : {
+ *       "system" : "SystemName",
+ *       "stream" : "StreamName"
+ *       "partition" : "777",
+ *       "offset" : "1",
+ *     }
+ *   },
+ *   "stateCheckpointMarkers" : {
+ *     "org.apache.samza.kafka.KafkaChangelogStateBackendFactory" : {
+ *       "store1": "changelogSystem;changelogTopic1;1;50"
+ *       "store2": "changelogSystem;changelogTopic2;1;51"
+ *     },
+ *     "factory2": {...}
+ *   }
+ * }
+ * </pre>
+ */
+public class CheckpointV2Serde implements Serde<CheckpointV2> {
+  private static final Serde<JsonCheckpoint> JSON_SERDE = new JsonSerdeV2<>(JsonCheckpoint.class);
+  private static final String SYSTEM = "system";
+  private static final String STREAM = "stream";
+  private static final String PARTITION = "partition";
+  private static final String OFFSET = "offset";
+
+  public CheckpointV2Serde() { }
+
+  @Override
+  public CheckpointV2 fromBytes(byte[] bytes) {
+    try {
+      JsonCheckpoint jsonCheckpoint = JSON_SERDE.fromBytes(bytes);
+      Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
+
+      jsonCheckpoint.getInputOffsets().forEach((sspName, sspInfo) -> {
+        String system = sspInfo.get(SYSTEM);
+        checkNotNull(system, String.format(
+            "System must be present in JSON-encoded SystemStreamPartition, input offsets map: %s, ", sspInfo));
+        String stream = sspInfo.get(STREAM);
+        checkNotNull(stream, String.format(
+            "Stream must be present in JSON-encoded SystemStreamPartition, input offsets map: %s, ", sspInfo));
+        String partition = sspInfo.get(PARTITION);
+        checkNotNull(partition, String.format(
+            "Partition must be present in JSON-encoded SystemStreamPartition, input offsets map: %s, ", sspInfo));
+        String offset = sspInfo.get(OFFSET);
+        checkNotNull(offset, String.format(
+            "Offset must be present in JSON-encoded SystemStreamPartition, input offsets map: %s, ", sspInfo));
+        sspOffsets.put(new SystemStreamPartition(system, stream, new Partition(Integer.parseInt(partition))), offset);
+      });
+
+
+      return new CheckpointV2(CheckpointId.fromString(jsonCheckpoint.getCheckpointId()), sspOffsets,
+          jsonCheckpoint.getStateCheckpointMarkers());
+    } catch (Exception e) {
+      throw new SamzaException(String.format("Exception while deserializing checkpoint: %s", Arrays.toString(bytes)), e);
+    }
+  }
+
+  @Override
+  public byte[] toBytes(CheckpointV2 checkpoint) {
+    try {
+      String checkpointId = checkpoint.getCheckpointId().toString();
+      Map<String, Map<String, String>> inputOffsets = new HashMap<>();
+
+      // TODO HIGH dchen change format to write specific serdes similar to Samza Object Mapper
+      // Serialize input offsets as maps keyed by the SSP.toString() to the another map of the constituent SSP
+      // components and offset. Jackson can't automatically serialize the SSP since it's not a POJO and this avoids

Review comment:
       Should we be doing this as part of the PR?

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, offset], separated by a semi-colon.
+ */
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + stateCheckpointMarker);
+    }
+    String[] payload = stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));
+    String offset = null;
+    if (!"null".equals(payload[3])) {
+      offset = payload[3];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(system, stream, partition), offset);
+  }
+
+  public SystemStreamPartition getChangelogSSP() {
+    return changelogSSP;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  /**
+   * Builds a map of store changelog SSPs to their offset for Kafka changelog backed stores from the provided
+   * map of state backend factory name to map of store name to serialized state checkpoint markers.
+   *
+   * @param stateBackendToStoreSCMs Map of state backend factory name to map of store name to serialized
+   *                                state checkpoint markers
+   * @return Map of store changelog SSPss to their optional offset, or an empty map if there is no mapping for
+   * {@link #KAFKA_STATE_BACKEND_FACTORY_NAME} in the input map. Optional offset may be empty if the
+   * changelog SSP was empty.
+   */
+  public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
+      Map<String, Map<String, String>> stateBackendToStoreSCMs) {
+    Map<SystemStreamPartition, Option<String>> sspToOffsetOptions = new HashMap<>();
+    if (stateBackendToStoreSCMs.containsKey(KAFKA_STATE_BACKEND_FACTORY_NAME)) {
+      Map<String, String> storeToKafkaSCMs = stateBackendToStoreSCMs.get(KAFKA_STATE_BACKEND_FACTORY_NAME);
+      storeToKafkaSCMs.forEach((key, value) -> {
+        KafkaStateCheckpointMarker stateMarker = KafkaStateCheckpointMarker.fromString(value);
+        Option<String> offsetOption = Option.apply(stateMarker.getChangelogOffset());
+        sspToOffsetOptions.put(new SystemStreamPartition(stateMarker.getChangelogSSP()), offsetOption);
+      });
+    }
+    return sspToOffsetOptions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;
+    return Objects.equals(changelogSSP, that.changelogSSP) &&
+        Objects.equals(changelogOffset, that.changelogOffset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(changelogSSP, changelogOffset);
+  }
+
+  /**
+   * WARNING: Do not change the toString() representation. It is used for serde'ing {@link KafkaStateCheckpointMarker}s,
+   * in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link KafkaStateCheckpointMarker}
+   */
+  @Override
+  public String toString() {
+    return String.format("%s%s%s%s%s%s%s",
+        changelogSSP.getSystem(), SEPARATOR, changelogSSP.getStream(), SEPARATOR,
+        changelogSSP.getPartition().getPartitionId(), SEPARATOR, changelogOffset);

Review comment:
       same as above. consider using an explicit method to Serde or reason on why jackson unlike other places in code base isn't the preferred choice.

##########
File path: samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
##########
@@ -53,6 +56,18 @@
   // commit period in milliseconds
   public static final String COMMIT_MS = "task.commit.ms";

Review comment:
       Update the configurations website/docs to reflect new configurations?

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.

Review comment:
       Do we need a version field for evolution or is this expected to not evolve?

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -240,6 +266,29 @@ public long getChangelogMinCompactionLagMs(String storeName) {
     return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs());
   }
 
+
+  public Set<String> getStateBackendBackupFactories() {
+    Set<String> stateBackupFactories = new HashSet<>();
+    getStoreNames().forEach((storeName) -> {
+      List<String> storeBackupFactory = getStoreBackupManagerClassName(storeName);
+      if (!storeBackupFactory.isEmpty()) {
+        stateBackupFactories.addAll(storeBackupFactory);
+      }
+    });
+    return stateBackupFactories;

Review comment:
       simplify by using streams API like below? not sure if you originally had it written this way to handle some branching?

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -122,6 +139,15 @@ public StorageConfig(Config config) {
     return Optional.ofNullable(systemStreamRes);
   }
 
+  public List<String> getStoreBackupManagerClassName(String storeName) {
+    List<String> storeBackupManagers = getList(String.format(STORE_BACKEND_BACKUP_FACTORIES, storeName), new ArrayList<>());
+    // For backwards compatibility if the changelog is enabled, we use default kafka backup factory
+    if (storeBackupManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
+      storeBackupManagers = DEFAULT_STATE_BACKEND_BACKUP_FACTORIES;
+    }

Review comment:
       Can users accidentally configure this to be not compatible with the changelog enabled configuration? i.e. should changelog only be enabled for kafka backed up stores?

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, offset], separated by a semi-colon.
+ */
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + stateCheckpointMarker);
+    }
+    String[] payload = stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));
+    String offset = null;
+    if (!"null".equals(payload[3])) {
+      offset = payload[3];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(system, stream, partition), offset);
+  }
+
+  public SystemStreamPartition getChangelogSSP() {
+    return changelogSSP;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  /**
+   * Builds a map of store changelog SSPs to their offset for Kafka changelog backed stores from the provided
+   * map of state backend factory name to map of store name to serialized state checkpoint markers.
+   *
+   * @param stateBackendToStoreSCMs Map of state backend factory name to map of store name to serialized
+   *                                state checkpoint markers
+   * @return Map of store changelog SSPss to their optional offset, or an empty map if there is no mapping for
+   * {@link #KAFKA_STATE_BACKEND_FACTORY_NAME} in the input map. Optional offset may be empty if the
+   * changelog SSP was empty.
+   */
+  public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
+      Map<String, Map<String, String>> stateBackendToStoreSCMs) {
+    Map<SystemStreamPartition, Option<String>> sspToOffsetOptions = new HashMap<>();
+    if (stateBackendToStoreSCMs.containsKey(KAFKA_STATE_BACKEND_FACTORY_NAME)) {
+      Map<String, String> storeToKafkaSCMs = stateBackendToStoreSCMs.get(KAFKA_STATE_BACKEND_FACTORY_NAME);
+      storeToKafkaSCMs.forEach((key, value) -> {
+        KafkaStateCheckpointMarker stateMarker = KafkaStateCheckpointMarker.fromString(value);
+        Option<String> offsetOption = Option.apply(stateMarker.getChangelogOffset());
+        sspToOffsetOptions.put(new SystemStreamPartition(stateMarker.getChangelogSSP()), offsetOption);
+      });
+    }
+    return sspToOffsetOptions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;

Review comment:
       can we be consistent in either using `instanceof` or comparing the `getClass`? Also please use curly braces for conditional statement for consistency. I don't think our code base encourages no curly conditionals.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+
+public class CheckpointV2 implements Checkpoint {
+  public static final short CHECKPOINT_VERSION = 2;
+
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, Map<String, String>> stateCheckpointMarkers;
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   *
+   * @param checkpointId {@link CheckpointId} associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of state backend factory name to map of local state store names
+   *                         to state checkpoints
+   */
+  public CheckpointV2(CheckpointId checkpointId,
+      Map<SystemStreamPartition, String> inputOffsets,
+      Map<String, Map<String, String>> stateCheckpoints) {
+    this.checkpointId = checkpointId;
+    this.inputOffsets = ImmutableMap.copyOf(inputOffsets);
+    this.stateCheckpointMarkers = ImmutableMap.copyOf(stateCheckpoints);
+  }
+
+  public short getVersion() {
+    return CHECKPOINT_VERSION;
+  }
+
+  /**
+   * Gets the checkpoint id for the checkpoint
+   * @return The timestamp based checkpoint identifier associated with the checkpoint
+   */
+  public CheckpointId getCheckpointId() {
+    return checkpointId;
+  }
+
+  /**
+   * Gets a unmodifiable view of the current input {@link SystemStreamPartition} offsets.
+   * @return An unmodifiable map of input {@link SystemStreamPartition}s to their recorded offsets.
+   */
+  @Override
+  public Map<SystemStreamPartition, String> getOffsets() {
+    return inputOffsets;
+  }
+
+  /**
+   * Gets the state checkpoint markers for all stores for each configured state backend.
+   *
+   * Note: We don't add this method to the {@link Checkpoint} interface since it is difficult
+   * to implement it for {@link CheckpointV1} without changing the underlying serialization format -
+   * the changelog SSP offsets are serialized in the same way as input offsets, and at
+   * deserialization time we don't have enough information (e.g. configs) to decide whether a
+   * particular entry is for an input SSP or a changelog SSP.
+   *
+   * @return Map of state backend factory name to map of local state store names to state checkpoint markers
+   */
+  public Map<String, Map<String, String>> getStateCheckpointMarkers() {
+    return stateCheckpointMarkers;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof CheckpointV2)) return false;
+
+    CheckpointV2 that = (CheckpointV2) o;
+
+    return checkpointId.equals(that.checkpointId) &&

Review comment:
       why not `Objects.equals(...)`?

##########
File path: samza-azure/src/main/java/org/apache/samza/checkpoint/azure/AzureCheckpointManager.java
##########
@@ -119,7 +120,8 @@ public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
 
     TableBatchOperation batchOperation = new TableBatchOperation();
 
-    Iterator<Map.Entry<SystemStreamPartition, String>> iterator = checkpoint.getOffsets().entrySet().iterator();
+    Iterator<Map.Entry<SystemStreamPartition, String>> iterator =
+        ((CheckpointV1) checkpoint).getOffsets().entrySet().iterator();

Review comment:
       Add a precondition above to ensure only `CheckpointV1` is only passed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1489:
URL: https://github.com/apache/samza/pull/1489


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616991206



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -36,7 +36,7 @@
   private final long millis;
   private final long nanos;
 
-  public CheckpointId(long millis, long nanos) {
+  private CheckpointId(long millis, long nanos) {
     this.millis = millis;
     this.nanos = nanos;
   }

Review comment:
       Makes sense. I realized its modulo and not division 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r624323071



##########
File path: samza-api/src/main/java/org/apache/samza/serializers/JsonCheckpointIdMixin.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link CheckpointId} to/from JSON.
+ */
+public abstract class JsonCheckpointIdMixin {

Review comment:
       This will be used in an upcoming patch (after Async commit) will clean this up in the last patch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] prateekm commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616965346



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
     return nanos;
   }
 
+  /**
+   * WARNING: Do not change the toString() representation. It is used for serde'ing {@link CheckpointId} as part of task
+   * checkpoints, in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link CheckpointId}.
+   */

Review comment:
       @mynameborat It didn't seem like it was worth adding the overhead of a separate serde class with the associated ceremony (serde factory, classload serde depending on the class version, etc.) for a relatively simple operation. The interface also isn't exactly the same (since this serdes to/from string while the Serde interfaces use byte[]). 
   
   It makes sense to rename them to 'to/from' or 'serialize/deserialize'.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org