You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/04/16 21:46:51 UTC

[GitHub] [samza] dxichen opened a new pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

dxichen opened a new pull request #1490:
URL: https://github.com/apache/samza/pull/1490


   - Introduce new state backend APIs for blobstore and kafka changelog
   - Change the task commit lifecycle to separate snapshot, upload and cleanup phases
   - Make the TaskInstance commit upload and cleanup phases nonblocking


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625488651



##########
File path: samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private StreamMetadataCache streamCache;
+  private SSPMetadataCache sspCache;
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
+        .flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
+            .map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
+        .collect(Collectors.toSet());
+    // filter out standby store-ssp pairs
+    Map<String, SystemStream> filteredStoreChangelogs =
+        filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
+    SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          getSspCache(systemAdmins, clock, changelogSSPs),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          getStreamCache(systemAdmins, clock),
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    }
+  }
+
+  @Override
+  public TaskStorageAdmin getAdmin() {
+    throw new SamzaException("getAdmin() method not supported for KafkaStateBackendFactory");
+  }
+
+  /**
+   * Shared cache across all KafkaRestoreManagers for the Kafka topic
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @return StreamMetadataCache containing the stream metadata
+   */
+  private StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  /**
+   * Shared cache across KafkaRestoreManagers for the Kafka partition
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @param ssps SSPs to prefetch
+   * @return SSPMetadataCache containing the partition metadata
+   */
+  private SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  private Map<String, SystemStream> filterStandbySystemStreams(Map<String, SystemStream> changelogSystemStreams,
+      ContainerModel containerModel) {
+    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
+    changelogSystemStreams.forEach((storeName, systemStream) ->
+        containerModel.getTasks().forEach((taskName, taskModel) ->
+            changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName))
+    );
+
+    Set<TaskModel> standbyTaskModels = containerModel.getTasks().values().stream()
+        .filter(taskModel -> taskModel.getTaskMode().equals(TaskMode.Standby))
+        .collect(Collectors.toSet());
+
+    // remove all standby task changelog ssps
+    standbyTaskModels.forEach((taskModel) -> {
+      changelogSystemStreams.forEach((storeName, systemStream) -> {
+        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
+        changelogSSPToStore.remove(ssp);
+      });
+    });

Review comment:
       good catch, will filter in the first place




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625498362



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {
+      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint);
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(checkpoint));
+    } else {
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(null));
+    }
+  }
+
+  /**
+   * Synchronously captures the current state of the stores in order to persist it to the backup manager
+   * in the async {@link #upload(CheckpointId, Map)} phase. Returns a map of state backend factory name to
+   * a map of store name to state checkpoint markers for all configured state backends and stores.
+   *
+   * @param checkpointId {@link CheckpointId} of the current commit
+   * @return a map of state backend factory name to a map of store name to state checkpoint markers
+   */
+  public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) {
+    // Flush all stores
+    storageEngines.values().forEach(StorageEngine::flush);
+
+    // Checkpoint all persisted and durable stores
+    storageEngines.forEach((storeName, storageEngine) -> {
+      if (storageEngine.getStoreProperties().isPersistedToDisk() &&
+          storageEngine.getStoreProperties().isDurableStore()) {
+        storageEngine.checkpoint(checkpointId);
+      }
+    });

Review comment:
       isDurable is a superset of isLogged, isLogged is specifically for kafka durability but isDurable means it is durable either blob store or kafka changelog




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625551596



##########
File path: samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private StreamMetadataCache streamCache;
+  private SSPMetadataCache sspCache;
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
+        .flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
+            .map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
+        .collect(Collectors.toSet());
+    // filter out standby store-ssp pairs
+    Map<String, SystemStream> filteredStoreChangelogs =
+        filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
+    SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          getSspCache(systemAdmins, clock, changelogSSPs),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          getStreamCache(systemAdmins, clock),
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    }
+  }
+
+  @Override
+  public TaskStorageAdmin getAdmin() {
+    throw new SamzaException("getAdmin() method not supported for KafkaStateBackendFactory");
+  }
+
+  /**
+   * Shared cache across all KafkaRestoreManagers for the Kafka topic
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @return StreamMetadataCache containing the stream metadata
+   */
+  private StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  /**
+   * Shared cache across KafkaRestoreManagers for the Kafka partition
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @param ssps SSPs to prefetch
+   * @return SSPMetadataCache containing the partition metadata
+   */
+  private SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;

Review comment:
       same as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625488925



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/KafkaTransactionalStateTaskBackupManager.scala
##########
@@ -0,0 +1,97 @@
+package org.apache.samza.storage
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util
+import java.util.concurrent.CompletableFuture
+
+import com.google.common.annotations.VisibleForTesting
+import com.google.common.collect.ImmutableSet
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId}
+import org.apache.samza.container.TaskName
+import org.apache.samza.system._
+import org.apache.samza.util.Logging
+import org.apache.samza.{Partition, SamzaException}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Manage all the storage engines for a given task
+ */
+class KafkaTransactionalStateTaskBackupManager(

Review comment:
       Since this was existing code, will create a separate ticket for cleanup




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on pull request #1490:
URL: https://github.com/apache/samza/pull/1490#issuecomment-821580514


   Please disregard the Checkpoint v2 migration commit since it is part of #1489 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625496838



##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -517,13 +514,16 @@ object SamzaContainer extends Logging {
     val loggedStorageBaseDir = getLoggedStorageBaseDir(jobConfig, defaultStoreBaseDir)
     info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
 
+    // TODO dchen should we enforce restore factories to be subset of backup factories?
+    val stateStorageBackendRestoreFactory = ReflectionUtil

Review comment:
       removed this comment, since this check may not always be relevant, we may add this as needed with general onboarding




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625502173



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {
+      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint);
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(checkpoint));
+    } else {
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(null));
+    }
+  }
+
+  /**
+   * Synchronously captures the current state of the stores in order to persist it to the backup manager
+   * in the async {@link #upload(CheckpointId, Map)} phase. Returns a map of state backend factory name to
+   * a map of store name to state checkpoint markers for all configured state backends and stores.
+   *
+   * @param checkpointId {@link CheckpointId} of the current commit
+   * @return a map of state backend factory name to a map of store name to state checkpoint markers
+   */
+  public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) {
+    // Flush all stores
+    storageEngines.values().forEach(StorageEngine::flush);
+
+    // Checkpoint all persisted and durable stores
+    storageEngines.forEach((storeName, storageEngine) -> {
+      if (storageEngine.getStoreProperties().isPersistedToDisk() &&
+          storageEngine.getStoreProperties().isDurableStore()) {
+        storageEngine.checkpoint(checkpointId);
+      }
+    });
+
+    // state backend factory -> store Name -> state checkpoint marker
+    Map<String, Map<String, String>> stateBackendToStoreSCMs = new HashMap<>();
+
+    // for each configured state backend factory, backup the state for all stores in this task.
+    stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> {
+      Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId);
+      LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}",
+          taskName, checkpointId, stateBackendFactoryName, snapshotSCMs);
+      stateBackendToStoreSCMs.put(stateBackendFactoryName, snapshotSCMs);
+    });

Review comment:
       We need to backup all the stores that are durable (or logged) and write as a checkpoint (to file) all the state that are persisted. Similarly, changelog enabled stores are persisted to disk only if they are persisted (ie non inmem), which should perserve the existing behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625488759



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util
+import java.util.concurrent.CompletableFuture
+
+import com.google.common.collect.ImmutableSet
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId}
+import org.apache.samza.container.TaskName
+import org.apache.samza.system._
+import org.apache.samza.util.Logging
+import org.apache.samza.{Partition, SamzaException}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Manage all the storage engines for a given task
+ */
+class KafkaNonTransactionalStateTaskBackupManager(

Review comment:
       Lets keep this as scala for now since it is existing code, will create a follow up to make this change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1490:
URL: https://github.com/apache/samza/pull/1490


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625551561



##########
File path: samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private StreamMetadataCache streamCache;
+  private SSPMetadataCache sspCache;
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
+        .flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
+            .map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
+        .collect(Collectors.toSet());
+    // filter out standby store-ssp pairs
+    Map<String, SystemStream> filteredStoreChangelogs =
+        filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
+    SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          getSspCache(systemAdmins, clock, changelogSSPs),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          getStreamCache(systemAdmins, clock),
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    }
+  }
+
+  @Override
+  public TaskStorageAdmin getAdmin() {
+    throw new SamzaException("getAdmin() method not supported for KafkaStateBackendFactory");
+  }
+
+  /**
+   * Shared cache across all KafkaRestoreManagers for the Kafka topic
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @return StreamMetadataCache containing the stream metadata
+   */
+  private StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }

Review comment:
       the streamcache itself allows for current access, for changelogBackendFactory, it is not required since it is access synchronously 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625500132



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;

Review comment:
       Unfortunately storageEngines are not created at this point, it is created after init is called, that is the reason we are handling containerStorageManager




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625484258



##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.Clock;
+
+
+/**
+ * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link TaskStorageAdmin}
+ * for a particular state storage backend, which are used to durably backup the Samza task state.
+ */
+public interface StateBackendFactory {
+  TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,

Review comment:
       jobmodel is used for potential forwards compatibility, and container model is used for restores, so I wanted to keep it symmetrical. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r618938246



##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.Clock;
+
+
+/**
+ * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link TaskStorageAdmin}
+ * for a particular state storage backend, which are used to durably backup the Samza task state.
+ */
+public interface StateBackendFactory {
+  TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry taskInstanceMetricsRegistry,
+      Config config,
+      Clock clock);
+
+  TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams);
+
+  TaskStorageAdmin getAdmin();

Review comment:
       [P2] Seems like this is doing nothing for now. Are we adding something in the upcoming PR here? 
   Also, doesn't it require any parameters for construction?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.Clock;
+
+
+/**
+ * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link TaskStorageAdmin}
+ * for a particular state storage backend, which are used to durably backup the Samza task state.
+ */
+public interface StateBackendFactory {
+  TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,

Review comment:
       [P1] Can you elaborate why job and container model are passed down here? Seems like unused parameters in this PR at the least.

##########
File path: samza-api/src/main/java/org/apache/samza/storage/TaskStorageAdmin.java
##########
@@ -17,21 +17,11 @@
  * under the License.
  */
 
-package org.apache.samza.storage
+package org.apache.samza.storage;
 
-import org.apache.samza.checkpoint.CheckpointId
-import org.apache.samza.system.SystemStreamPartition
+public interface TaskStorageAdmin {

Review comment:
       [P2] Can we add javadocs?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+import java.util
+import java.util.concurrent.CompletableFuture
+
+import com.google.common.collect.ImmutableSet
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId}
+import org.apache.samza.container.TaskName
+import org.apache.samza.system._
+import org.apache.samza.util.Logging
+import org.apache.samza.{Partition, SamzaException}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Manage all the storage engines for a given task
+ */
+class KafkaNonTransactionalStateTaskBackupManager(

Review comment:
       [P1] Can we convert this to java instead? Doesn't seem like lot of code.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {

Review comment:
       [P1] It doesn't seem like `CheckpointManager` is null based on this PR. What is the scenario where it can be null? If not, please add null checks in the constructor.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")
+  val asyncUploadNs = newTimer("async-upload-ns")
+  val commitNs = newTimer("commit-ns")
+  val snapshotNs = newTimer("snapshot-ns")

Review comment:
       [P2] Do all these reflect in the commit-ns metrics for runloop or what contributes to the metrics and what doesn't?
   Consider documenting these metrics in the metrics documentation.
   
   Also, would it make sense to prefeix some of these metrics to differentiate these from runloop metrics or is the task group sufficient to differentiate?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.Clock;
+
+
+/**
+ * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link TaskStorageAdmin}
+ * for a particular state storage backend, which are used to durably backup the Samza task state.
+ */
+public interface StateBackendFactory {
+  TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry taskInstanceMetricsRegistry,
+      Config config,
+      Clock clock);
+
+  TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,

Review comment:
       [P1] Seems inconsistent with the above. Maybe consider passing the same context as part of the signature of `getBackupManager` too so that access to `JobModel` and `ContainerModel` are all through the context variables which makes access pattern consistent in the code base and helps with evolution.
   
   That said, is the `TaskContext` available before instantiation so that `TaskModel` is also accessed through the context?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private StreamMetadataCache streamCache;
+  private SSPMetadataCache sspCache;
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
+        .flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
+            .map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
+        .collect(Collectors.toSet());
+    // filter out standby store-ssp pairs
+    Map<String, SystemStream> filteredStoreChangelogs =
+        filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
+    SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          getSspCache(systemAdmins, clock, changelogSSPs),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          getStreamCache(systemAdmins, clock),
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    }
+  }
+
+  @Override
+  public TaskStorageAdmin getAdmin() {
+    throw new SamzaException("getAdmin() method not supported for KafkaStateBackendFactory");
+  }
+
+  /**
+   * Shared cache across all KafkaRestoreManagers for the Kafka topic
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @return StreamMetadataCache containing the stream metadata
+   */
+  private StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  /**
+   * Shared cache across KafkaRestoreManagers for the Kafka partition
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @param ssps SSPs to prefetch
+   * @return SSPMetadataCache containing the partition metadata
+   */
+  private SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;

Review comment:
       [P1] Same as above regarding concurrency??

##########
File path: samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private StreamMetadataCache streamCache;
+  private SSPMetadataCache sspCache;
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
+        .flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
+            .map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
+        .collect(Collectors.toSet());
+    // filter out standby store-ssp pairs
+    Map<String, SystemStream> filteredStoreChangelogs =
+        filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
+    SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          getSspCache(systemAdmins, clock, changelogSSPs),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          getStreamCache(systemAdmins, clock),
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    }
+  }
+
+  @Override
+  public TaskStorageAdmin getAdmin() {
+    throw new SamzaException("getAdmin() method not supported for KafkaStateBackendFactory");
+  }
+
+  /**
+   * Shared cache across all KafkaRestoreManagers for the Kafka topic
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @return StreamMetadataCache containing the stream metadata
+   */
+  private StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  /**
+   * Shared cache across KafkaRestoreManagers for the Kafka partition
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @param ssps SSPs to prefetch
+   * @return SSPMetadataCache containing the partition metadata
+   */
+  private SSPMetadataCache getSspCache(SystemAdmins admins, Clock clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, ssps);
+    }
+    return sspCache;
+  }
+
+  private Map<String, SystemStream> filterStandbySystemStreams(Map<String, SystemStream> changelogSystemStreams,
+      ContainerModel containerModel) {
+    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
+    changelogSystemStreams.forEach((storeName, systemStream) ->
+        containerModel.getTasks().forEach((taskName, taskModel) ->
+            changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName))
+    );
+
+    Set<TaskModel> standbyTaskModels = containerModel.getTasks().values().stream()
+        .filter(taskModel -> taskModel.getTaskMode().equals(TaskMode.Standby))
+        .collect(Collectors.toSet());
+
+    // remove all standby task changelog ssps
+    standbyTaskModels.forEach((taskModel) -> {
+      changelogSystemStreams.forEach((storeName, systemStream) -> {
+        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
+        changelogSSPToStore.remove(ssp);
+      });
+    });

Review comment:
       why not filter out the SSp in the first place when populating the map? You already have the task model and the task mode with it.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/KafkaTransactionalStateTaskBackupManager.scala
##########
@@ -0,0 +1,97 @@
+package org.apache.samza.storage
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util
+import java.util.concurrent.CompletableFuture
+
+import com.google.common.annotations.VisibleForTesting
+import com.google.common.collect.ImmutableSet
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId}
+import org.apache.samza.container.TaskName
+import org.apache.samza.system._
+import org.apache.samza.util.Logging
+import org.apache.samza.{Partition, SamzaException}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Manage all the storage engines for a given task
+ */
+class KafkaTransactionalStateTaskBackupManager(

Review comment:
       [P1] Same here. Consider writing this in Java.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private StreamMetadataCache streamCache;
+  private SSPMetadataCache sspCache;
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    } else {
+      return new KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(), storeChangelogs,
+          systemAdmins, taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams) {
+    Map<String, SystemStream> storeChangelogs = new StorageConfig(config).getStoreChangelogs();
+    Set<SystemStreamPartition> changelogSSPs = storeChangelogs.values().stream()
+        .flatMap(ss -> containerContext.getContainerModel().getTasks().values().stream()
+            .map(tm -> new SystemStreamPartition(ss, tm.getChangelogPartition())))
+        .collect(Collectors.toSet());
+    // filter out standby store-ssp pairs
+    Map<String, SystemStream> filteredStoreChangelogs =
+        filterStandbySystemStreams(storeChangelogs, containerContext.getContainerModel());
+    SystemAdmins systemAdmins = new SystemAdmins(kafkaChangelogRestoreParams.getSystemAdmins());
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          getSspCache(systemAdmins, clock, changelogSSPs),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          kafkaChangelogRestoreParams.getStoreNames(),
+          jobContext,
+          containerContext,
+          taskModel,
+          filteredStoreChangelogs,
+          kafkaChangelogRestoreParams.getInMemoryStores(),
+          kafkaChangelogRestoreParams.getStorageEngineFactories(),
+          kafkaChangelogRestoreParams.getSerdes(),
+          systemAdmins,
+          getStreamCache(systemAdmins, clock),
+          kafkaChangelogRestoreParams.getStoreConsumers(),
+          metricsRegistry,
+          kafkaChangelogRestoreParams.getCollector(),
+          jobContext.getJobModel().getMaxChangeLogStreamPartitions(),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    }
+  }
+
+  @Override
+  public TaskStorageAdmin getAdmin() {
+    throw new SamzaException("getAdmin() method not supported for KafkaStateBackendFactory");
+  }
+
+  /**
+   * Shared cache across all KafkaRestoreManagers for the Kafka topic
+   *
+   * @param admins system admins used the fetch the stream metadata
+   * @param clock for cache invalidation
+   * @return StreamMetadataCache containing the stream metadata
+   */
+  private StreamMetadataCache getStreamCache(SystemAdmins admins, Clock clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }

Review comment:
       [P1] Do we need to worry about concurrent access here?

##########
File path: samza-kafka/src/test/java/org/apache/samza/storage/TestTaskStorageManager.scala
##########
@@ -58,7 +59,9 @@ import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
   * @param offsetFileName the name of the offset file.
   */
 @RunWith(value = classOf[Parameterized])
-class TestNonTransactionalStateTaskStorageManager(offsetFileName: String) extends MockitoSugar {
+@Ignore
+// TODO dchen fix this class

Review comment:
       [P1] Need to fix these tests?

##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -517,13 +514,16 @@ object SamzaContainer extends Logging {
     val loggedStorageBaseDir = getLoggedStorageBaseDir(jobConfig, defaultStoreBaseDir)
     info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
 
+    // TODO dchen should we enforce restore factories to be subset of backup factories?
+    val stateStorageBackendRestoreFactory = ReflectionUtil

Review comment:
       are we handling it as part of this PR?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {

Review comment:
       [P2] javadocs

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {
+      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint);
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(checkpoint));
+    } else {
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(null));

Review comment:
       [P2] Can we use a sentinel checkpoint instead of null?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
##########
@@ -283,6 +314,24 @@ public boolean storeExists(File storeDir) {
     }
   }
 
+  /**
+   * Read and return the {@link CheckpointV2} from the directory's {@link #CHECKPOINT_FILE_NAME} file.
+   * If the file does not exist, returns null.
+   * // TODO HIGH dchen add tests at all call sites for handling null value.
+   *
+   * @param storagePartitionDir store directory to read the checkpoint file from
+   * @return the {@link CheckpointV2} object retrieved from the checkpoint file if found, otherwise return null
+   */
+  public CheckpointV2 readCheckpointV2File(File storagePartitionDir) {
+    File checkpointFile = new File(storagePartitionDir, CHECKPOINT_FILE_NAME);
+    if (checkpointFile.exists()) {
+      String serializedCheckpointV2 = new FileUtil().readWithChecksum(checkpointFile);
+      return new CheckpointV2Serde().fromBytes(serializedCheckpointV2.getBytes());
+    } else {
+      return null;

Review comment:
       Can we return non-null here?

##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -697,10 +710,10 @@ object SamzaContainer extends Logging {
     */
   @VisibleForTesting
   private[container] def getChangelogSSPsForContainer(containerModel: ContainerModel,

Review comment:
       [P1] Unused method since we moved the logic to fetch the changelog SSPs into the `StateBackendFactory` implementations. 
   
   Please remove the usage in `SamzaContainer` and move the tests that exercises this code path from `TestSamzaContainer` to `StateBackendFactory` tests.
   

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {
+      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint);
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(checkpoint));
+    } else {
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(null));
+    }
+  }
+
+  /**
+   * Synchronously captures the current state of the stores in order to persist it to the backup manager
+   * in the async {@link #upload(CheckpointId, Map)} phase. Returns a map of state backend factory name to
+   * a map of store name to state checkpoint markers for all configured state backends and stores.
+   *
+   * @param checkpointId {@link CheckpointId} of the current commit
+   * @return a map of state backend factory name to a map of store name to state checkpoint markers
+   */
+  public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) {
+    // Flush all stores
+    storageEngines.values().forEach(StorageEngine::flush);
+
+    // Checkpoint all persisted and durable stores
+    storageEngines.forEach((storeName, storageEngine) -> {
+      if (storageEngine.getStoreProperties().isPersistedToDisk() &&
+          storageEngine.getStoreProperties().isDurableStore()) {
+        storageEngine.checkpoint(checkpointId);
+      }
+    });

Review comment:
       Is `isDurableStore()` equivalent to `isLoggedStore()`? We used to checkpoint for persisted and logged store. Making sure this is just a rename?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {
+      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint);
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(checkpoint));
+    } else {
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(null));
+    }
+  }
+
+  /**
+   * Synchronously captures the current state of the stores in order to persist it to the backup manager
+   * in the async {@link #upload(CheckpointId, Map)} phase. Returns a map of state backend factory name to
+   * a map of store name to state checkpoint markers for all configured state backends and stores.
+   *
+   * @param checkpointId {@link CheckpointId} of the current commit
+   * @return a map of state backend factory name to a map of store name to state checkpoint markers
+   */
+  public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) {
+    // Flush all stores
+    storageEngines.values().forEach(StorageEngine::flush);
+
+    // Checkpoint all persisted and durable stores
+    storageEngines.forEach((storeName, storageEngine) -> {
+      if (storageEngine.getStoreProperties().isPersistedToDisk() &&
+          storageEngine.getStoreProperties().isDurableStore()) {
+        storageEngine.checkpoint(checkpointId);
+      }
+    });
+
+    // state backend factory -> store Name -> state checkpoint marker
+    Map<String, Map<String, String>> stateBackendToStoreSCMs = new HashMap<>();
+
+    // for each configured state backend factory, backup the state for all stores in this task.
+    stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> {
+      Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId);
+      LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}",
+          taskName, checkpointId, stateBackendFactoryName, snapshotSCMs);
+      stateBackendToStoreSCMs.put(stateBackendFactoryName, snapshotSCMs);
+    });

Review comment:
       Do we need to backup the state for all stores or the stores within each of the backup factories satisfy the above requirement for checkpointing (persisted & durable)?
   
   Looking at the Kafka implementations, storeChangelogs are the ones that are iterated for fetching the snapshot but the above criteria needs both durable & persisted. 
   
   Checking if changelog enabled stores are persisted to disk by default and what the behavior is otherwise?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;

Review comment:
       [P1] Can we persist the `StorageEngines` here instead of getting a handle of `ContainerStorageManager`? 

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores

Review comment:
       Is there a way to enforce or validate this assumption? Refer to the above comment on moving this to constructor. Is it here because the data may not be available during construction?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625484701



##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.Clock;
+
+
+/**
+ * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link TaskStorageAdmin}
+ * for a particular state storage backend, which are used to durably backup the Samza task state.
+ */
+public interface StateBackendFactory {
+  TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry taskInstanceMetricsRegistry,
+      Config config,
+      Clock clock);
+
+  TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,

Review comment:
       will change the backup to context passing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625502332



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {
+      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint);
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(checkpoint));
+    } else {
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(null));

Review comment:
       will address this as a follow up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625484393



##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.util.Clock;
+
+
+/**
+ * Factory to build the Samza {@link TaskBackupManager}, {@link TaskRestoreManager} and {@link TaskStorageAdmin}
+ * for a particular state storage backend, which are used to durably backup the Samza task state.
+ */
+public interface StateBackendFactory {
+  TaskBackupManager getBackupManager(JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry taskInstanceMetricsRegistry,
+      Config config,
+      Clock clock);
+
+  TaskRestoreManager getRestoreManager(JobContext jobContext,
+      ContainerContext containerContext,
+      TaskModel taskModel,
+      ExecutorService restoreExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir,
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams);
+
+  TaskStorageAdmin getAdmin();

Review comment:
       We have some upcoming addition here pertaining to ressource creation like blob store clients, that are specifically for statebackends




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625499791



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {

Review comment:
       It could be null if the checkpoint manager is not configured, similar null checks are in other parts of the code base such as containerStorageManager.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625498615



##########
File path: samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")
+  val asyncUploadNs = newTimer("async-upload-ns")
+  val commitNs = newTimer("commit-ns")
+  val snapshotNs = newTimer("snapshot-ns")

Review comment:
       adding to the follow up ticket




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625500263



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created to stores

Review comment:
       Yes the data is not avail during construction




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1490: SAMZA-2591: Async Commit [2/3]: Task Commit api changes and async commit

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625495464



##########
File path: samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
##########
@@ -697,10 +710,10 @@ object SamzaContainer extends Logging {
     */
   @VisibleForTesting
   private[container] def getChangelogSSPsForContainer(containerModel: ContainerModel,

Review comment:
       moved thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org