You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/02/24 18:12:22 UTC

[GitHub] [samza] dxichen commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r578000163



##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       The default is because currently the KafkaBackupManagers do not need to be initiated, so I don't think the BackupManager should be forced to implement this

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new HashMap<>());

Review comment:
       This is useful when we are using the new reading scheme which looks for the checkpoint id on every checkpoint.

##########
File path: samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, StateCheckpointMarker] = {
     debug("Flushing stores.")
-    containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,

Review comment:
       it was originally in the snapshot call above, moved it to this method

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       This is kept for backwards compatibility, the old checkpoints in the kafka topic will be using this format

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);

Review comment:
       Good point, since this is called for the SamzaContainer from a single thread, it does not need to be thread safe. May change with further pathces

##########
File path: samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition(),
+          taskModel.getTaskMode(), new StorageManagerUtil());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Map<String, SystemStream> filteredStoreChangelogs = ContainerStorageManager
+        .getChangelogSystemStreams(containerModel, storeChangelogs, null);
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+    File nonLoggedStoreBaseDir = SamzaContainer.getNonLoggedStorageBaseDir(new JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          taskModel,
+          taskStores,
+          filteredStoreChangelogs,
+          systemAdmins,
+          null, // TODO @dchen have the restore managers create and manage Kafka consume lifecycle
+          KafkaChangelogStateBackendFactory
+              .getSspCache(systemAdmins, clock, Collections.emptySet()),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          taskModel,
+          filteredStoreChangelogs,
+          taskStores,
+          systemAdmins,
+          KafkaChangelogStateBackendFactory.getStreamCache(systemAdmins, clock),
+          null, // TODO  @dchen have the restore managers create and manage Kafka consume lifecycle

Review comment:
       This path is not yet used, but will eventually need to when the restore managers are refactored

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -224,8 +228,8 @@ class TaskInstance(
     val allCheckpointOffsets = new java.util.HashMap[SystemStreamPartition, String]()
     val inputCheckpoint = offsetManager.buildCheckpoint(taskName)
     if (inputCheckpoint != null) {
-      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getOffsets))
-      allCheckpointOffsets.putAll(inputCheckpoint.getOffsets)
+      trace("Got input offsets for taskName: %s as: %s" format(taskName, inputCheckpoint.getInputOffsets))
+      allCheckpointOffsets.putAll(inputCheckpoint.getInputOffsets)

Review comment:
       removed allCheckpointOffsets

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -73,8 +76,9 @@ class TaskInstance(
 
   private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
     (storeName: String) => {
-      if (storageManager != null && storageManager.getStore(storeName).isDefined) {
-        storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, _]]
+      if (containerStorageManager != null) {
+        val storeOption = JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption

Review comment:
       removed java optional




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org