You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/04/29 12:55:12 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1491: SAMZA-2591: Async Commit [3/3]: Container restore lifecycle

mynameborat commented on a change in pull request #1491:
URL: https://github.com/apache/samza/pull/1491#discussion_r622275570



##########
File path: samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.task.MessageCollector;
+
+/**
+ * Provides the required for Kafka Changelog restore managers
+ */
+public class KafkaChangelogRestoreParams {
+  private final Map<String, SystemConsumer> storeConsumers;
+  private final Map<String, StorageEngine> inMemoryStores;
+  private final Map<String, SystemAdmin> systemAdmins;
+  private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories;
+  private final Map<String, Serde<Object>> serdes;
+  private final MessageCollector collector;
+  private final Set<String> storeNames;
+
+  public KafkaChangelogRestoreParams(
+      Map<String, SystemConsumer> storeConsumers,
+      Map<String, StorageEngine> inMemoryStores,
+      Map<String, SystemAdmin> systemAdmins,
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Map<String, Serde<Object>> serdes,
+      MessageCollector collector,
+      Set<String> storeNames) {
+    this.storeConsumers = storeConsumers;
+    this.inMemoryStores = inMemoryStores;
+    this.systemAdmins = systemAdmins;
+    this.storageEngineFactories = storageEngineFactories;
+    this.serdes = serdes;
+    this.collector = collector;
+    this.storeNames = storeNames;

Review comment:
       is this set different from the `keySet` of `storageEngineFactories`? If not, why can't we infer?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,17 +110,22 @@
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
   private static final String SIDEINPUTS_THREAD_NAME = "SideInputs Thread";
   private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
+  private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY =

Review comment:
       reuse storage factory constant from `StorageConfig`

##########
File path: samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.task.MessageCollector;
+
+/**
+ * Provides the required for Kafka Changelog restore managers
+ */
+public class KafkaChangelogRestoreParams {
+  private final Map<String, SystemConsumer> storeConsumers;
+  private final Map<String, StorageEngine> inMemoryStores;
+  private final Map<String, SystemAdmin> systemAdmins;
+  private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories;
+  private final Map<String, Serde<Object>> serdes;
+  private final MessageCollector collector;
+  private final Set<String> storeNames;
+
+  public KafkaChangelogRestoreParams(
+      Map<String, SystemConsumer> storeConsumers,
+      Map<String, StorageEngine> inMemoryStores,
+      Map<String, SystemAdmin> systemAdmins,
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Map<String, Serde<Object>> serdes,
+      MessageCollector collector,
+      Set<String> storeNames) {
+    this.storeConsumers = storeConsumers;
+    this.inMemoryStores = inMemoryStores;
+    this.systemAdmins = systemAdmins;
+    this.storageEngineFactories = storageEngineFactories;
+    this.serdes = serdes;
+    this.collector = collector;
+    this.storeNames = storeNames;
+  }
+
+  public Map<String, SystemConsumer> getStoreConsumers() {
+    return storeConsumers;
+  }
+
+  public Map<String, StorageEngine> getInMemoryStores() {
+    return inMemoryStores;
+  }
+
+  public Map<String, SystemAdmin> getSystemAdmins() {
+    return systemAdmins;
+  }
+
+  public Map<String, StorageEngineFactory<Object, Object>> getStorageEngineFactories() {
+    return storageEngineFactories;
+  }
+
+  public Map<String, Serde<Object>> getSerdes() {
+    return serdes;
+  }
+
+  public MessageCollector getCollector() {
+    return collector;
+  }

Review comment:
       IIRC, message collector are scoped and managed in the context of method invocation. According to the javadocs
   
   ```
   /**
    * Used as an interface for the means of sending message envelopes to an output stream.
    *
    * <p>A MessageCollector is provided on every call to {@link StreamTask#process} and
    * {@link WindowableTask#window}. You must use those MessageCollector objects only within
    * those method calls, and not hold on to a reference for use at any other time.
    */
    ```
    
    Why are we holding on to the instance here? 

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -838,15 +831,24 @@ private boolean awaitSideInputTasks() throws InterruptedException {
    * @return the task store.
    */
   public Optional<StorageEngine> getStore(TaskName taskName, String storeName) {
+    if (taskStores == null) {
+      throw new SamzaException(String.format(
+          "Attempting to access store %s for task %s before ContainerStorageManager is started.",
+          storeName, taskName));
+    }

Review comment:
       can we use lifecycle signal instead of null check to infer initialization complete?

##########
File path: samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
##########
@@ -43,6 +43,10 @@ public SystemAdmins(Config config, String adminLabel) {
     this.systemAdminMap = systemConfig.getSystemAdmins(adminLabel);
   }
 
+  public SystemAdmins(Map<String, SystemAdmin> systemAdminMap) {
+    this.systemAdminMap = systemAdminMap;
+  }
+

Review comment:
       Is this for testing? If so, please make it package private and annotate it with `VisibleForTesting`.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -388,106 +415,77 @@ public ContainerStorageManager(
   }
 
   /**
-   * Create taskStores for all stores in storageEngineFactories.
-   * The store mode is chosen as bulk-load if its a non-sideinput store, and readWrite if its a sideInput store
+   * Create taskStores for all stores in storesToCreate.
+   * The store mode is chosen as read-write mode.
    */
-  private Map<TaskName, Map<String, StorageEngine>> createTaskStores(ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
+  private Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<String> storesToCreate,

Review comment:
       we are not using bulk mode anymore in restore scenario. Why is that?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -612,73 +604,59 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel
             handlers.put(ssp, taskSideInputHandler);
           });
 
-          LOG.info("Created TaskSideInputHandler for task {}, sideInputStores {} and loggedStoreBaseDirectory {}",
-              taskName, sideInputStores, loggedStoreBaseDirectory);
+          LOG.info("Created TaskSideInputHandler for task {}, taskSideInputStores {} and loggedStoreBaseDirectory {}",
+              taskName, taskSideInputStores, loggedStoreBaseDirectory);
         }
       });
     }
     return handlers;
   }
 
-  private Map<String, StorageEngine> getSideInputStores(TaskName taskName) {
-    return taskStores.get(taskName).entrySet().stream().
-        filter(e -> this.taskSideInputStoreSSPs.get(taskName).containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-  }
-
-  private Map<String, StorageEngine> getNonSideInputStores(TaskName taskName) {
-    return taskStores.get(taskName).entrySet().stream().
-        filter(e -> !this.taskSideInputStoreSSPs.get(taskName).containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-  }
-
   private Set<TaskSideInputHandler> getSideInputHandlers() {
     return this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
   }
 
   public void start() throws SamzaException, InterruptedException {
-    Map<SystemStreamPartition, String> checkpointedChangelogSSPOffsets = new HashMap<>();
-    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
-      getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> {
-        if (checkpointManager != null) {
-          Set<SystemStream> changelogSystemStreams = new HashSet<>(this.changelogSystemStreams.values());
-          Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
-          if (checkpoint != null) {
-            checkpoint.getOffsets().forEach((ssp, offset) -> {
-              if (changelogSystemStreams.contains(new SystemStream(ssp.getSystem(), ssp.getStream()))) {
-                checkpointedChangelogSSPOffsets.put(ssp, offset);
-              }
-            });
-          }
-        }
-      });
+    // Restores and recreates
+    restoreStores();
+    // Shutdown restore executor since it will no longer be used
+    try {
+      restoreExecutor.shutdown();
+      if (restoreExecutor.awaitTermination(RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.MILLISECONDS)) {
+        restoreExecutor.shutdownNow();
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
     }
-    LOG.info("Checkpointed changelog ssp offsets: {}", checkpointedChangelogSSPOffsets);
-    restoreStores(checkpointedChangelogSSPOffsets);
     if (this.hasSideInputs) {
       startSideInputs();
     }
   }
 
   // Restoration of all stores, in parallel across tasks
-  private void restoreStores(Map<SystemStreamPartition, String> checkpointedChangelogSSPOffsets)
-      throws InterruptedException {
+  private void restoreStores() throws InterruptedException {
     LOG.info("Store Restore started");
 
     // initialize each TaskStorageManager
-    this.taskRestoreManagers.values().forEach(taskStorageManager ->
-       taskStorageManager.init(checkpointedChangelogSSPOffsets));
+    this.taskRestoreManagers.forEach((taskName, taskRestoreManager) -> {
+      Checkpoint taskCheckpoint = null;
+      Set<TaskName> activeTasks = getTasks(containerModel, TaskMode.Active).keySet(); // TODO HIGH dchen verify standby taskRestoreManagers behavior

Review comment:
       can be moved outside the loop and cached to prevent repetitive calls.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -388,106 +415,77 @@ public ContainerStorageManager(
   }
 
   /**
-   * Create taskStores for all stores in storageEngineFactories.
-   * The store mode is chosen as bulk-load if its a non-sideinput store, and readWrite if its a sideInput store
+   * Create taskStores for all stores in storesToCreate.
+   * The store mode is chosen as read-write mode.
    */
-  private Map<TaskName, Map<String, StorageEngine>> createTaskStores(ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
+  private Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<String> storesToCreate,
+      ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories, Map<String, Serde<Object>> serdes,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
-
     Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
+    StorageConfig storageConfig = new StorageConfig(config);
 
-    // iterate over each task in the containerModel, and each store in storageEngineFactories
+    // iterate over each task and each storeName
     for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
       TaskName taskName = task.getKey();
       TaskModel taskModel = task.getValue();
-
       if (!taskStores.containsKey(taskName)) {
         taskStores.put(taskName, new HashMap<>());
       }
 
-      for (String storeName : storageEngineFactories.keySet()) {
-
-        StorageEngineFactory.StoreMode storeMode = this.taskSideInputStoreSSPs.get(taskName).containsKey(storeName) ?
-            StorageEngineFactory.StoreMode.ReadWrite : StorageEngineFactory.StoreMode.BulkLoad;
+      for (String storeName : storesToCreate) {
+        // A store is considered durable if it is backed by a changelog or another backupManager factory
+        boolean isDurable = changelogSystemStreams.containsKey(storeName) ||
+            !storageConfig.getStoreBackupManagerClassName(storeName).isEmpty();
+        boolean isSideInput = this.taskSideInputStoreSSPs.get(taskName).containsKey(storeName);

Review comment:
       can use `sideInputStores` set instead to check if a store is side input store instead of two look ups.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -357,25 +374,35 @@ public ContainerStorageManager(
   }
 
   private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, SystemStream> changelogSystemStreams,
-      Map<String, SystemConsumer> storeSystemConsumers) {
+      Map<String, SystemConsumer> systemNameToSystemConsumers) {
     // Map of each storeName to its respective systemConsumer
     Map<String, SystemConsumer> storeConsumers = new HashMap<>();
 
     // Populate the map of storeName to its relevant systemConsumer
     for (String storeName : changelogSystemStreams.keySet()) {
-      storeConsumers.put(storeName, storeSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
+      storeConsumers.put(storeName, systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
     }
     return storeConsumers;
   }
 
-  private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, SamzaContainerMetrics samzaContainerMetrics) {
+  private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(StateBackendFactory factory, Clock clock,
+      SamzaContainerMetrics samzaContainerMetrics) {
     Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
+
     containerModel.getTasks().forEach((taskName, taskModel) -> {
+      MetricsRegistry taskMetricsRegistry =
+          taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
+      Set<String> nonSideInputStoreNames = storageEngineFactories.keySet().stream()
+          .filter(storeName -> !sideInputStoreNames.contains(storeName))
+          .collect(Collectors.toSet());
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers,
+          inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes,
+          taskInstanceCollectors.get(taskName), nonSideInputStoreNames);
+
       taskRestoreManagers.put(taskName,
-          TaskRestoreManagerFactory.create(
-              taskModel, changelogSystemStreams, getNonSideInputStores(taskName), systemAdmins,
-              streamMetadataCache, sspMetadataCache, storeConsumers, maxChangeLogStreamPartitions,
-              loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock));
+          factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor,
+              taskMetricsRegistry, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
+              kafkaChangelogRestoreParams));

Review comment:
       why are restore managers not taking `streamMetadataCache` anymore?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
##########
@@ -557,6 +596,60 @@ private static void validateRestoreOffsets(RestoreOffsets restoreOffsets, System
     }
   }
 
+  private Map<String, KafkaStateCheckpointMarker> getCheckpointedChangelogOffsets(Checkpoint checkpoint) {

Review comment:
       Do we no longer need changelog offset keyed on `storageChangelogSSP`? Is it because  we ensure the stores handled is non-side input stores and there is 1-1 mapping between store name and `storeChangelogSSP` for that store?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org