You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/04/20 19:28:23 UTC

[GitHub] [samza] dxichen commented on a change in pull request #1489: SAMZA-2591: Async Commit [1/3]: Checkpoint v2 migration

dxichen commented on a change in pull request #1489:
URL: https://github.com/apache/samza/pull/1489#discussion_r616971736



##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -61,6 +61,11 @@ public long getNanos() {
     return nanos;
   }
 
+  /**
+   * WARNING: Do not change the toString() representation. It is used for serde'ing {@link CheckpointId} as part of task
+   * checkpoints, in conjunction with {@link #fromString(String)}.
+   * @return the String representation of this {@link CheckpointId}.
+   */

Review comment:
       Agreed, imo keeping it in String format will make it easier to read; will rename it and use the jackson model for here and all other occurences

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
##########
@@ -36,7 +36,7 @@
   private final long millis;
   private final long nanos;
 
-  public CheckpointId(long millis, long nanos) {
+  private CheckpointId(long millis, long nanos) {
     this.millis = millis;
     this.nanos = nanos;
   }

Review comment:
       I think in that case, we can directly store `nano % 1e6` as the class param, will make that change.

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -19,54 +19,27 @@
 
 package org.apache.samza.checkpoint;
 
+import java.util.Map;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
 
+public interface Checkpoint {
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Gets the version number of the Checkpoint
+   * @return Short indicating the version number
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
-  }
+  short getVersion();
 
   /**
-   * Gets a unmodifiable view of the current Samza stream offsets.
-   * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+   * Gets a unmodifiable view of the last processed offsets for {@link SystemStreamPartition}s.
+   * The returned value differs based on the Checkpoint version:
+   * <ol>
+   *    <li>For {@link CheckpointV1}, returns the input {@link SystemStreamPartition} offsets, as well
+   *      as the latest KafkaStateChangelogOffset for any store changelog {@link SystemStreamPartition} </li>
+   *    <li>For {@link CheckpointV2} returns the input offsets only.</li>
+   * </ol>

Review comment:
       Going forwards, we are going to rely on the former in that we will create checkpointv2 with differentiated state offsets and input offsets. I have changed the references to checkpoint creation in this PR to reflect that property. Furthermore in the impl of CheckpointV2, the separation of input and state offsets are also distinguished

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -122,6 +139,15 @@ public StorageConfig(Config config) {
     return Optional.ofNullable(systemStreamRes);
   }
 
+  public List<String> getStoreBackupManagerClassName(String storeName) {
+    List<String> storeBackupManagers = getList(String.format(STORE_BACKEND_BACKUP_FACTORIES, storeName), new ArrayList<>());
+    // For backwards compatibility if the changelog is enabled, we use default kafka backup factory
+    if (storeBackupManagers.isEmpty() && getChangelogStream(storeName).isPresent()) {
+      storeBackupManagers = DEFAULT_STATE_BACKEND_BACKUP_FACTORIES;
+    }

Review comment:
       for the first phase of the migration, we are defaulting to restore via kafka and the changelog will be enabled by default for all the stores with dual commit to both blob store and changelog.

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, offset], separated by a semi-colon.
+ */
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + stateCheckpointMarker);
+    }
+    String[] payload = stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));

Review comment:
       will wrap in IAE

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.

Review comment:
       A version field makes sense here, will add

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointV2.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+
+public class CheckpointV2 implements Checkpoint {
+  public static final short CHECKPOINT_VERSION = 2;
+
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, Map<String, String>> stateCheckpointMarkers;
+
+  /**
+   * Constructs the checkpoint with separated input and state offsets
+   *
+   * @param checkpointId {@link CheckpointId} associated with this checkpoint
+   * @param inputOffsets Map of Samza system stream partition to offset of the checkpoint
+   * @param stateCheckpoints Map of state backend factory name to map of local state store names
+   *                         to state checkpoints
+   */
+  public CheckpointV2(CheckpointId checkpointId,
+      Map<SystemStreamPartition, String> inputOffsets,
+      Map<String, Map<String, String>> stateCheckpoints) {
+    this.checkpointId = checkpointId;
+    this.inputOffsets = ImmutableMap.copyOf(inputOffsets);
+    this.stateCheckpointMarkers = ImmutableMap.copyOf(stateCheckpoints);

Review comment:
       Good point, they should not be null; adding invariant checks

##########
File path: samza-core/src/main/java/org/apache/samza/serializers/JsonCheckpoint.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.util.Map;
+
+/**
+ * Used for Json serialization of the {@link org.apache.samza.checkpoint.Checkpoint} class by the
+ * {@link CheckpointV2Serde}
+ * This cannot be an internal class as required by Jackson Object mapper
+ */
+public class JsonCheckpoint {
+  private String checkpointId;
+  private Map<String, Map<String, String>> inputOffsets;
+  // Map<StorageBackendFactoryName, Map<StoreName, StateCheckpointMarker>>

Review comment:
       will change this to the mixin model

##########
File path: samza-core/src/main/java/org/apache/samza/checkpoint/kafka/KafkaStateCheckpointMarker.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.KafkaChangelogStateBackendFactory;
+import org.apache.samza.system.SystemStreamPartition;
+import scala.Option;
+
+
+/**
+ * Used as the serialization format for the state checkpoints of {@link org.apache.samza.checkpoint.CheckpointV2}
+ * for a store using {@link org.apache.samza.storage.KafkaTransactionalStateTaskBackupManager} or
+ * {@link org.apache.samza.storage.KafkaNonTransactionalStateTaskBackupManager} for tracking the latest committed
+ * store changelog offsets.
+ *
+ * Kafka state checkpoint marker has the format: [system, stream, partition, offset], separated by a semi-colon.
+ */
+@InterfaceStability.Unstable
+public class KafkaStateCheckpointMarker {
+  public static final String KAFKA_STATE_BACKEND_FACTORY_NAME = KafkaChangelogStateBackendFactory.class.getName();
+  public static final String SEPARATOR = ";";
+
+  private final SystemStreamPartition changelogSSP;
+  private final String changelogOffset;
+
+  public KafkaStateCheckpointMarker(SystemStreamPartition changelogSSP, String changelogOffset) {
+    this.changelogSSP = changelogSSP;
+    this.changelogOffset = changelogOffset;
+  }
+
+  public static KafkaStateCheckpointMarker fromString(String stateCheckpointMarker) {
+    if (StringUtils.isBlank(stateCheckpointMarker)) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker format: " + stateCheckpointMarker);
+    }
+    String[] payload = stateCheckpointMarker.split(KafkaStateCheckpointMarker.SEPARATOR);
+    if (payload.length != 4) {
+      throw new IllegalArgumentException("Invalid KafkaStateCheckpointMarker parts count: " + stateCheckpointMarker);
+    }
+
+    String system = payload[0];
+    String stream = payload[1];
+    Partition partition = new Partition(Integer.parseInt(payload[2]));
+    String offset = null;
+    if (!"null".equals(payload[3])) {
+      offset = payload[3];
+    }
+
+    return new KafkaStateCheckpointMarker(new SystemStreamPartition(system, stream, partition), offset);
+  }
+
+  public SystemStreamPartition getChangelogSSP() {
+    return changelogSSP;
+  }
+
+  public String getChangelogOffset() {
+    return changelogOffset;
+  }
+
+  /**
+   * Builds a map of store changelog SSPs to their offset for Kafka changelog backed stores from the provided
+   * map of state backend factory name to map of store name to serialized state checkpoint markers.
+   *
+   * @param stateBackendToStoreSCMs Map of state backend factory name to map of store name to serialized
+   *                                state checkpoint markers
+   * @return Map of store changelog SSPss to their optional offset, or an empty map if there is no mapping for
+   * {@link #KAFKA_STATE_BACKEND_FACTORY_NAME} in the input map. Optional offset may be empty if the
+   * changelog SSP was empty.
+   */
+  public static Map<SystemStreamPartition, Option<String>> scmsToSSPOffsetMap(
+      Map<String, Map<String, String>> stateBackendToStoreSCMs) {
+    Map<SystemStreamPartition, Option<String>> sspToOffsetOptions = new HashMap<>();
+    if (stateBackendToStoreSCMs.containsKey(KAFKA_STATE_BACKEND_FACTORY_NAME)) {
+      Map<String, String> storeToKafkaSCMs = stateBackendToStoreSCMs.get(KAFKA_STATE_BACKEND_FACTORY_NAME);
+      storeToKafkaSCMs.forEach((key, value) -> {
+        KafkaStateCheckpointMarker stateMarker = KafkaStateCheckpointMarker.fromString(value);
+        Option<String> offsetOption = Option.apply(stateMarker.getChangelogOffset());
+        sspToOffsetOptions.put(new SystemStreamPartition(stateMarker.getChangelogSSP()), offsetOption);
+      });
+    }
+    return sspToOffsetOptions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    KafkaStateCheckpointMarker that = (KafkaStateCheckpointMarker) o;

Review comment:
       will fix here and everywhere else




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org