You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "prateekm (via GitHub)" <gi...@apache.org> on 2023/03/07 07:02:15 UTC

[GitHub] [samza] prateekm opened a new pull request, #1655: Refactored ContainerStorageManager for readability and manageability

prateekm opened a new pull request, #1655:
URL: https://github.com/apache/samza/pull/1655

   Issues: ContainerStorageManager has gotten very unwieldy with 1000+ LOC of highly complex and invocation-order dependent logic. It also does too much by trying to manage side inputs lifecycle along with regular state stores'.
    
   Changes: 
       1. Separated side inputs (and standby) related store creation and restore logic into a separate SideInputsManager class.
       2. Separated helper methods into a ContainerStorageManagerUtil class, and made them all static to ensure that they don't mutate class fields. 
   
   Most of the PR is simply moving code from one place to another. However a couple of places that introduce semantic changes or otherwise require careful review are called inline in "Refactor Note" comments.
   
   There is a lot more room for improvement (e.g. more javadocs, more tests, simplifying method signatures, etc). But this change should make it easier to reason about the lifecycle of side inputs for now.
   
   Tests: Existing ContainerStorageManager unit tests pass.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1655: Refactored ContainerStorageManager for readability and manageability

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1655:
URL: https://github.com/apache/samza/pull/1655#discussion_r1129791534


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -273,491 +200,52 @@ public ContainerStorageManager(
         );
     this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
-
-    this.sspSideInputHandlers = createSideInputHandlers(clock);
-
-    // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
-    if (this.hasSideInputs) {
-      Set<SystemStream> containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream()
-          .flatMap(map -> map.values().stream())
-          .flatMap(Set::stream)
-          .map(SystemStreamPartition::getSystemStream)
-          .collect(Collectors.toSet());
-
-      Set<String> containerSideInputSystems = containerSideInputSystemStreams.stream()
-          .map(SystemStream::getSystem)
-          .collect(Collectors.toSet());
-
-      // create sideInput consumers indexed by systemName
-      // Mapping from storeSystemNames to SystemConsumers
-      Map<String, SystemConsumer> sideInputConsumers =
-          createConsumers(containerSideInputSystems, systemFactories, config, this.samzaContainerMetrics.registry());
-
-      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false);
-
-      SystemConsumersMetrics sideInputSystemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX);
-      // we use the same registry as samza-container-metrics
-
-      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
-          sideInputSystemConsumersMetrics.registry(), systemAdmins);
-
-      ApplicationConfig applicationConfig = new ApplicationConfig(config);
-
-      sideInputSystemConsumers =
-          new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
-              sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
-              TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
-              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId());
-    }
-
-  }
-
-  /**
-   * Remove changeLogSSPs that are associated with standby tasks from changelogSSP map and only return changelogSSPs
-   * associated with the active tasks.
-   * The standby changelogs will be consumed and restored as side inputs.
-   *
-   * @param containerModel the container's model
-   * @param changelogSystemStreams the passed in set of changelogSystemStreams
-   * @return A map of changeLogSSP to storeName across all tasks, assuming no two stores have the same changelogSSP
-   */
-  @VisibleForTesting
-  Map<String, SystemStream> getActiveTaskChangelogSystemStreams(ContainerModel containerModel,
-      Map<String, SystemStream> changelogSystemStreams) {
-    if (MapUtils.invertMap(changelogSystemStreams).size() != changelogSystemStreams.size()) {
-      throw new SamzaException("Two stores cannot have the same changelog system-stream");
-    }
-
-    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
-    changelogSystemStreams.forEach((storeName, systemStream) ->
-        containerModel.getTasks().forEach((taskName, taskModel) ->
-            changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName))
-    );
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
-        changelogSSPToStore.remove(ssp);
-      });
-    });
-
-    // changelogSystemStreams correspond only to active tasks (since those of standby-tasks moved to sideInputs above)
-    return MapUtils.invertMap(changelogSSPToStore).entrySet().stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getSystemStream()));
-  }
-
-  /**
-   * Fetch the side input stores. For active containers, the stores correspond to the side inputs and for standbys, they
-   * include the durable stores.
-   * @param containerModel the container's model
-   * @param sideInputSystemStreams the map of store to side input system streams
-   * @param changelogSystemStreams the map of store to changelog system streams
-   * @return A set of side input stores
-   */
-  @VisibleForTesting
-  Set<String> getSideInputStores(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, SystemStream> changelogSystemStreams) {
-    // add all the side input stores by default regardless of active vs standby
-    Set<String> sideInputStores = new HashSet<>(sideInputSystemStreams.keySet());
-
-    // In case of standby tasks, we treat the stores that have changelogs as side input stores for bootstrapping state
-    if (getTasks(containerModel, TaskMode.Standby).size() > 0) {
-      sideInputStores.addAll(changelogSystemStreams.keySet());
-    }
-    return sideInputStores;
-  }
-
-  /**
-   * Add all sideInputs to a map of maps, indexed first by taskName, then by sideInput store name.
-   *
-   * @param containerModel the containerModel to use
-   * @param sideInputSystemStreams the map of store to sideInput system stream
-   * @param changelogSystemStreams the map of store to changelog system stream
-   * @return taskSideInputSSPs map
-   */
-  @VisibleForTesting
-  Map<TaskName, Map<String, Set<SystemStreamPartition>>> getTaskSideInputSSPs(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, SystemStream> changelogSystemStreams) {
-    Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = new HashMap<>();
-
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      sideInputSystemStreams.keySet().forEach(storeName -> {
-        Set<SystemStreamPartition> taskSideInputs = taskModel.getSystemStreamPartitions().stream().filter(ssp -> sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
-        taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
-      });
-    });
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
-        taskSideInputSSPs.get(taskName).put(storeName, Collections.singleton(ssp));
-      });
-    });
-
-    return taskSideInputSSPs;
-  }
-
-  /**
-   *  Creates SystemConsumer objects for store restoration, creating one consumer per system.
-   */
-  private static Map<String, SystemConsumer> createConsumers(Set<String> storeSystems,
-      Map<String, SystemFactory> systemFactories, Config config, MetricsRegistry registry) {
-    // Create one consumer for each system in use, map with one entry for each such system
-    Map<String, SystemConsumer> consumers = new HashMap<>();
-
-    // Iterate over the list of storeSystems and create one sysConsumer per system
-    for (String storeSystemName : storeSystems) {
-      SystemFactory systemFactory = systemFactories.get(storeSystemName);
-      if (systemFactory == null) {
-        throw new SamzaException("System " + storeSystemName + " does not exist in config");
-      }
-      consumers.put(storeSystemName, systemFactory.getConsumer(storeSystemName, config, registry));
-    }
-
-    return consumers;
-  }
-
-  private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, SystemStream> changelogSystemStreams,
-      Map<String, SystemConsumer> systemNameToSystemConsumers) {
-    // Map of each storeName to its respective systemConsumer
-    Map<String, SystemConsumer> storeConsumers = new HashMap<>();
-
-    // Populate the map of storeName to its relevant systemConsumer
-    for (String storeName : changelogSystemStreams.keySet()) {
-      storeConsumers.put(storeName, systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
-    }
-    return storeConsumers;
-  }
-
-  private Map<String, TaskRestoreManager> createTaskRestoreManagers(Map<String, StateBackendFactory> factories,
-      Map<String, Set<String>> backendFactoryStoreNames, Clock clock, SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
-      TaskModel taskModel) {
-    // Get the factories for the task based on the stores of the tasks to be restored from the factory
-    Map<String, TaskRestoreManager> backendFactoryRestoreManagers = new HashMap<>(); // backendFactoryName -> restoreManager
-    MetricsRegistry taskMetricsRegistry =
-        taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-    backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
-      StateBackendFactory factory = factories.get(factoryName);
-      if (factory == null) {
-        throw new SamzaException(
-            String.format("Required restore state backend factory: %s not found in configured factories %s",
-                factoryName, String.join(", ", factories.keySet())));
-      }
-      KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers,
-          inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes,
-          taskInstanceCollectors.get(taskName));
-      TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor,
-          taskMetricsRegistry, storeNames, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
-          kafkaChangelogRestoreParams);
-
-      backendFactoryRestoreManagers.put(factoryName, restoreManager);
-    });
-    samzaContainerMetrics.addStoresRestorationGauge(taskName);
-    return backendFactoryRestoreManagers;
-  }
-
-  /**
-   * Return a map of backend factory names to set of stores that should be restored using it
-   */
-  @VisibleForTesting
-  Map<String, Set<String>> getBackendFactoryStoreNames(Checkpoint checkpoint, Set<String> storeNames,
-      StorageConfig storageConfig) {
-    Map<String, Set<String>> backendFactoryStoreNames = new HashMap<>(); // backendFactoryName -> set(storeNames)
-
-    if (checkpoint != null && checkpoint.getVersion() == 1) {
-      // Only restore stores with changelog streams configured
-      Set<String> changelogStores = storeNames.stream()
-          .filter(storeName -> storageConfig.getChangelogStream(storeName).isPresent())
-          .collect(Collectors.toSet());
-      // Default to changelog backend factory when using checkpoint v1 for backwards compatibility
-      if (!changelogStores.isEmpty()) {
-        backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, changelogStores);
-      }
-      if (storeNames.size() > changelogStores.size()) {
-        Set<String> nonChangelogStores = storeNames.stream()
-            .filter(storeName -> !changelogStores.contains(storeName))
-            .collect(Collectors.toSet());
-        LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1,"
-                + "restore for the store will be skipped",
-            nonChangelogStores);
-      }
-    } else if (checkpoint == null ||  checkpoint.getVersion() == 2) {
-      // Extract the state checkpoint markers if checkpoint exists
-      Map<String, Map<String, String>> stateCheckpointMarkers = checkpoint == null ? Collections.emptyMap() :
-          ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
-
-      // Find stores associated to each state backend factory
-      storeNames.forEach(storeName -> {
-        List<String> storeFactories = storageConfig.getStoreRestoreFactories(storeName);
-
-        if (storeFactories.isEmpty()) {
-          // If the restore factory is not configured for the store and the store does not have a changelog topic
-          LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs,"
-                  + "restore for the store will be skipped",
-              storeName);
-        } else {
-          // Search the ordered list for the first matched state backend factory in the checkpoint
-          // If the checkpoint does not exist or state checkpoint markers does not exist, we match the first configured
-          // restore manager
-          Optional<String> factoryNameOpt = storeFactories.stream()
-              .filter(factoryName -> stateCheckpointMarkers.containsKey(factoryName) &&
-                  stateCheckpointMarkers.get(factoryName).containsKey(storeName))
-              .findFirst();
-          String factoryName;
-          if (factoryNameOpt.isPresent()) {
-            factoryName = factoryNameOpt.get();
-          } else { // Restore factories configured but no checkpoints found
-            // Use first configured restore factory
-            factoryName = storeFactories.get(0);
-            LOG.warn("No matching checkpoints found for configured factories: {}, " +
-                "defaulting to using the first configured factory with no checkpoints", storeFactories);
-          }
-          if (!backendFactoryStoreNames.containsKey(factoryName)) {
-            backendFactoryStoreNames.put(factoryName, new HashSet<>());
-          }
-          backendFactoryStoreNames.get(factoryName).add(storeName);
-        }
-      });
-    } else {
-      throw new SamzaException(String.format("Unsupported checkpoint version %s", checkpoint.getVersion()));
-    }
-    return backendFactoryStoreNames;
-  }
-
-  // Helper method to filter active Tasks from the container model
-  private static Map<TaskName, TaskModel> getTasks(ContainerModel containerModel, TaskMode taskMode) {
-    return containerModel.getTasks().entrySet().stream()
-        .filter(x -> x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
 
-  /**
-   * Create taskStores for all stores in storesToCreate.
-   * The store mode is chosen as read-write mode.
-   */
-  private Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<String> storesToCreate,
-      ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories, Map<String, Serde<Object>> serdes,
-      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
-      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
-    Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
-    StorageConfig storageConfig = new StorageConfig(config);
-
-    // iterate over each task and each storeName
-    for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
-      TaskName taskName = task.getKey();
-      TaskModel taskModel = task.getValue();
-      if (!taskStores.containsKey(taskName)) {
-        taskStores.put(taskName, new HashMap<>());
-      }
-
-      for (String storeName : storesToCreate) {
-        List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
-        // A store is considered durable if it is backed by a changelog or another backupManager factory
-        boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
-        boolean isSideInput = this.sideInputStoreNames.contains(storeName);
-        // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
-        // for non logged stores
-        File storeBaseDir = isDurable || isSideInput ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
-        File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
-            taskModel.getTaskMode());
-        this.storeDirectoryPaths.add(storeDirectory.toPath());
-
-        // if taskInstanceMetrics are specified use those for store metrics,
-        // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
-        MetricsRegistry storeMetricsRegistry =
-            taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-        StorageEngine storageEngine =
-            createStore(storeName, storeDirectory, taskModel, jobContext, containerContext, storageEngineFactories,
-                serdes, storeMetricsRegistry, taskInstanceCollectors.get(taskName),
-                StorageEngineFactory.StoreMode.ReadWrite, this.changelogSystemStreams, this.config);
-
-        // add created store to map
-        taskStores.get(taskName).put(storeName, storageEngine);
-
-        LOG.info("Created task store {} in read-write mode for task {} in path {}", storeName, taskName, storeDirectory.getAbsolutePath());
-      }
-    }
-    return taskStores;
-  }
-
-  /**
-   * Method to instantiate a StorageEngine with the given parameters, and populate the storeDirectory paths (used to monitor
-   * disk space).
-   */
-  public static StorageEngine createStore(
-      String storeName,
-      File storeDirectory,
-      TaskModel taskModel,
-      JobContext jobContext,
-      ContainerContext containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Map<String, Serde<Object>> serdes,
-      MetricsRegistry storeMetricsRegistry,
-      MessageCollector messageCollector,
-      StorageEngineFactory.StoreMode storeMode,
-      Map<String, SystemStream> changelogSystemStreams,
-      Config config) {
-
-    StorageConfig storageConfig = new StorageConfig(config);
-    SystemStreamPartition changeLogSystemStreamPartition = changelogSystemStreams.containsKey(storeName) ?
-        new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()) : null;
-
-    Optional<String> storageKeySerde = storageConfig.getStorageKeySerde(storeName);
-    Serde keySerde = null;
-    if (storageKeySerde.isPresent()) {
-      keySerde = serdes.get(storageKeySerde.get());
-    }
-    Optional<String> storageMsgSerde = storageConfig.getStorageMsgSerde(storeName);
-    Serde messageSerde = null;
-    if (storageMsgSerde.isPresent()) {
-      messageSerde = serdes.get(storageMsgSerde.get());
-    }
-
-    return storageEngineFactories.get(storeName)
-        .getStorageEngine(storeName, storeDirectory, keySerde, messageSerde, messageCollector,
-            storeMetricsRegistry, changeLogSystemStreamPartition, jobContext, containerContext, storeMode);
-  }
-
-
-  // Create sideInput store processors, one per store per task
-  private Map<TaskName, Map<String, SideInputsProcessor>> createSideInputProcessors(StorageConfig config,
-      ContainerModel containerModel, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
-
-    Map<TaskName, Map<String, SideInputsProcessor>> sideInputStoresToProcessors = new HashMap<>();
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      sideInputStoresToProcessors.put(taskName, new HashMap<>());
-      TaskMode taskMode = taskModel.getTaskMode();
-
-      for (String storeName : this.taskSideInputStoreSSPs.get(taskName).keySet()) {
-
-        SideInputsProcessor sideInputsProcessor;
-        Optional<String> sideInputsProcessorSerializedInstance =
-            config.getSideInputsProcessorSerializedInstance(storeName);
-
-        if (sideInputsProcessorSerializedInstance.isPresent()) {
-
-          sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get());
-          LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName);
-
-        } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) {
-          String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get();
-          SideInputsProcessorFactory sideInputsProcessorFactory =
-              ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class);
-          sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
-          LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);
-
-        } else {
-          // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy
-
-          // if this is a standby-task and the store is a non-side-input changelog store
-          // we creating identity sideInputProcessor for stores of standbyTasks
-          // have to use the right serde because the sideInput stores are created
-
-          Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
-          Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
-          sideInputsProcessor = new SideInputsProcessor() {
-            @Override
-            public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
-              // Ignore message if the key is null
-              if (message.getKey() == null) {
-                return ImmutableList.of();
-              } else {
-                // Skip serde if the message is null
-                return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
-                    message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage())));
-              }
-            }
-          };
-          LOG.info("Using identity side-inputs-processor for store: {}, task: {}", storeName, taskName);
-        }
-
-        sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor);
-      }
-    });
-
-    return sideInputStoresToProcessors;
-  }
-
-  // Create task sideInput storage managers, one per task, index by the SSP they are responsible for consuming
-  private Map<SystemStreamPartition, TaskSideInputHandler> createSideInputHandlers(Clock clock) {
-    // creating sideInput store processors, one per store per task
-    Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
-        createSideInputProcessors(new StorageConfig(config), this.containerModel, this.taskInstanceMetrics);
-
-    Map<SystemStreamPartition, TaskSideInputHandler> handlers = new HashMap<>();
-
-    if (this.hasSideInputs) {
-      containerModel.getTasks().forEach((taskName, taskModel) -> {
-
-        Map<String, StorageEngine> taskSideInputStores = sideInputStores.get(taskName);
-        Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new HashMap<>();
-        boolean taskHasSideInputs = false;
-        for (String storeName : taskSideInputStores.keySet()) {
-          Set<SystemStreamPartition> storeSSPs = this.taskSideInputStoreSSPs.get(taskName).get(storeName);
-          taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty();
-          sideInputStoresToSSPs.put(storeName, storeSSPs);
-        }
-
-        if (taskHasSideInputs) {
-          CountDownLatch taskCountDownLatch = new CountDownLatch(1);
-          this.sideInputTaskLatches.put(taskName, taskCountDownLatch);
-
-          TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName,
-              taskModel.getTaskMode(),
-              loggedStoreBaseDirectory,
-              taskSideInputStores,
-              sideInputStoresToSSPs,
-              taskSideInputProcessors.get(taskName),
-              this.systemAdmins,
-              this.streamMetadataCache,
-              taskCountDownLatch,
-              clock);
-
-          sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
-            handlers.put(ssp, taskSideInputHandler);
-          });
-
-          LOG.info("Created TaskSideInputHandler for task {}, taskSideInputStores {} and loggedStoreBaseDirectory {}",
-              taskName, taskSideInputStores, loggedStoreBaseDirectory);
-        }
-      });
-    }
-    return handlers;
-  }
-
-  private Set<TaskSideInputHandler> getSideInputHandlers() {
-    return this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
-  }
 
   public void start() throws SamzaException, InterruptedException {
-    // Restores and recreates
+    // Restores and recreates stores.
     restoreStores();
+
     // Shutdown restore executor since it will no longer be used
     try {
       restoreExecutor.shutdown();
       if (restoreExecutor.awaitTermination(RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.MILLISECONDS)) {
         restoreExecutor.shutdownNow();
       }
     } catch (Exception e) {
-      LOG.error(e.getMessage());
-    }
-    if (this.hasSideInputs) {
-      startSideInputs();
+      LOG.error("Error shutting down restore executor", e);
     }
+
+    // create and restore side input stores
+    this.sideInputsManager = new SideInputsManager(
+        sideInputSystemStreams, systemFactories, activeTaskChangelogSystemStreams, storageEngineFactories, storeDirectoryPaths, containerModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, streamMetadataCache, systemAdmins, serdeManager,

Review Comment:
   Good catch! Fixed and verified and renamed other usages to clearly distinguish between the two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm merged pull request #1655: Refactored ContainerStorageManager for readability and manageability

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm merged PR #1655:
URL: https://github.com/apache/samza/pull/1655


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1655: Refactored ContainerStorageManager for readability and manageability

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #1655:
URL: https://github.com/apache/samza/pull/1655#discussion_r1128978158


##########
samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java:
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import scala.collection.JavaConversions;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.RunLoop;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.system.SystemConsumersMetrics;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.chooser.DefaultChooser;
+import org.apache.samza.system.chooser.MessageChooser;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.task.TaskInstanceCollector;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SideInputsManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SideInputsManager.class);
+
+  private static final String SIDE_INPUTS_THREAD_NAME = "SideInputs Thread";
+  // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer
+  private static final String SIDE_INPUTS_METRICS_PREFIX = "side-inputs-";
+
+  // Timeout with which sideinput thread checks for exceptions and for whether SSPs as caught up
+  private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10;
+  private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60;
+  private static final int DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR = 1;
+
+  private final SamzaContainerMetrics samzaContainerMetrics;
+  private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
+  private final Config config;
+
+  /* Sideinput related parameters */
+  private final boolean hasSideInputs;
+  private final Map<TaskName, Map<String, StorageEngine>> sideInputStores;
+  // side inputs indexed first by task, then store name
+  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs;
+  private final Set<String> sideInputStoreNames;
+  private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers;
+  private SystemConsumers sideInputSystemConsumers;
+
+  // Used by the sideInput-read thread to signal to the main thread. Map's contents are mutated.
+  private final Map<TaskName, CountDownLatch> sideInputTaskLatches;
+  private final ExecutorService sideInputsExecutor = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDE_INPUTS_THREAD_NAME).build());
+  private RunLoop sideInputRunLoop; // created in start()
+
+  private volatile boolean shouldShutdown = false;
+  private volatile Throwable sideInputException = null;
+
+  public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
+      Map<String, SystemFactory> systemFactories,
+      Map<String, SystemStream> changelogSystemStreams,
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Set<Path> storeDirectoryPaths,
+      ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
+      SamzaContainerMetrics samzaContainerMetrics,
+      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
+      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
+      StreamMetadataCache streamMetadataCache,
+      SystemAdmins systemAdmins,
+      SerdeManager serdeManager, Map<String, Serde<Object>> serdes,
+      StorageManagerUtil storageManagerUtil,
+      File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
+      Config config, Clock clock) {
+    this.taskSideInputStoreSSPs = getTaskSideInputSSPs(sideInputSystemStreams, changelogSystemStreams, containerModel);
+    this.sideInputStoreNames = ContainerStorageManagerUtil.getSideInputStoreNames(
+        sideInputSystemStreams, changelogSystemStreams, containerModel);
+    this.sideInputTaskLatches = new HashMap<>();
+    this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(m -> m.values().stream())
+        .flatMap(Collection::stream)
+        .findAny()
+        .isPresent();
+
+    this.taskInstanceMetrics = taskInstanceMetrics;
+    this.samzaContainerMetrics = samzaContainerMetrics;
+    this.config = config;
+
+    // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
+    this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories, sideInputStoreNames, changelogSystemStreams, storeDirectoryPaths, containerModel,
+        jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors,
+        storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
+
+    this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs, sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
+    );
+
+    // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
+    if (this.hasSideInputs) {
+      Set<SystemStream> containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream()
+          .flatMap(map -> map.values().stream())
+          .flatMap(Set::stream)
+          .map(SystemStreamPartition::getSystemStream)
+          .collect(Collectors.toSet());
+
+      Set<String> containerSideInputSystems = containerSideInputSystemStreams.stream()
+          .map(SystemStream::getSystem)
+          .collect(Collectors.toSet());
+
+      // create sideInput consumers indexed by systemName
+      // Mapping from storeSystemNames to SystemConsumers
+      Map<String, SystemConsumer> sideInputConsumers =
+          ContainerStorageManagerUtil.createSystemConsumers(containerSideInputSystems, systemFactories,
+              samzaContainerMetrics.registry(), config);
+
+      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata =
+          streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false);
+
+      // we use the same registry as samza-container-metrics
+      SystemConsumersMetrics sideInputSystemConsumersMetrics =
+          new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);
+
+      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
+          sideInputSystemConsumersMetrics.registry(), systemAdmins);
+
+      ApplicationConfig applicationConfig = new ApplicationConfig(config);
+
+      this.sideInputSystemConsumers =
+          new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
+              sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
+              SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
+              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId());
+    }
+  }
+
+  // read sideInputs until all sideInputStreams are caught up, then return
+  public void start() {
+    if (this.hasSideInputs) {
+      LOG.info("SideInput Restore started");
+
+      // initialize the sideInputStorageManagers
+      this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::init);
+
+      Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = this.sspSideInputHandlers.values().stream()
+          .distinct()
+          .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity()));
+
+      Map<TaskName, TaskInstanceMetrics> sideInputTaskMetrics = new HashMap<>();
+      Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+      this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        Set<SystemStreamPartition> taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
+
+        if (!taskSSPs.isEmpty()) {
+          String sideInputSource = SIDE_INPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source();
+          TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics(
+              sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDE_INPUTS_METRICS_PREFIX);
+          sideInputTaskMetrics.put(taskName, sideInputMetrics);
+
+          RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
+              taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+          sideInputTasks.put(taskName, sideInputTask);
+        }
+      });
+
+      // register all sideInput SSPs with the consumers
+      for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
+        String startingOffset = this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp);
+
+        if (startingOffset == null) {
+          throw new SamzaException(
+              "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start.");
+        }
+
+        // register startingOffset with the sysConsumer and register a metric for it
+        sideInputSystemConsumers.register(ssp, startingOffset);
+        taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+            ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+        sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+            ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+      }
+
+      // start the systemConsumers for consuming input
+      this.sideInputSystemConsumers.start();
+
+      TaskConfig taskConfig = new TaskConfig(this.config);
+      SamzaContainerMetrics sideInputContainerMetrics =
+          new SamzaContainerMetrics(SIDE_INPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(),
+              this.samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);
+
+      final ApplicationConfig applicationConfig = new ApplicationConfig(config);
+
+      this.sideInputRunLoop = new RunLoop(sideInputTasks,
+          null, // all operations are executed in the main runloop thread
+          this.sideInputSystemConsumers,
+          1, // single message in flight per task
+          -1, // no windowing
+          taskConfig.getCommitMs(),
+          taskConfig.getCallbackTimeoutMs(),
+          taskConfig.getDrainCallbackTimeoutMs(),
+          // TODO consolidate these container configs SAMZA-2275
+          this.config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)),
+          taskConfig.getMaxIdleMs(),
+          sideInputContainerMetrics,
+          System::nanoTime,
+          false,
+          DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR,
+          applicationConfig.getRunId(),
+          ApplicationUtil.isHighLevelApiJob(config)
+      ); // commit must be synchronous to ensure integrity of state flush
+
+      try {
+        sideInputsExecutor.submit(() -> {
+          try {
+            sideInputRunLoop.run();
+          } catch (Exception e) {
+            LOG.error("Exception in reading sideInputs", e);
+            sideInputException = e;
+          }
+        });
+
+        // Make the main thread wait until all sideInputs have been caughtup or an exception was thrown
+        while (!shouldShutdown && sideInputException == null &&
+            !awaitSideInputTasks(sideInputTaskLatches)) {
+          LOG.debug("Waiting for SideInput bootstrap to complete");
+        }
+
+        if (sideInputException != null) { // Throw exception if there was an exception in catching-up sideInputs
+          throw new SamzaException("Exception in restoring sideInputs", sideInputException);
+        }
+
+      } catch (InterruptedException e) {
+        LOG.warn("Received an interrupt during side inputs store restoration."
+            + " Exiting prematurely without completing store restore.");
+        /*
+         * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
+         * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
+         * resources prematurely here.
+         */
+        shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
+        throw new SamzaException("Side inputs read was interrupted", e);
+      }
+
+      LOG.info("SideInput Restore complete");
+    }
+  }
+
+  public Map<TaskName, Map<String, StorageEngine>> getSideInputStores() {
+    return ImmutableMap.copyOf(this.sideInputStores);
+  }
+
+  public void shutdown() {
+    // stop all side input consumers and stores

Review Comment:
   We need to set the `shouldShutdown = true` here for behavior parity across the refactor?



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -273,491 +200,52 @@ public ContainerStorageManager(
         );
     this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
-
-    this.sspSideInputHandlers = createSideInputHandlers(clock);
-
-    // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
-    if (this.hasSideInputs) {
-      Set<SystemStream> containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream()
-          .flatMap(map -> map.values().stream())
-          .flatMap(Set::stream)
-          .map(SystemStreamPartition::getSystemStream)
-          .collect(Collectors.toSet());
-
-      Set<String> containerSideInputSystems = containerSideInputSystemStreams.stream()
-          .map(SystemStream::getSystem)
-          .collect(Collectors.toSet());
-
-      // create sideInput consumers indexed by systemName
-      // Mapping from storeSystemNames to SystemConsumers
-      Map<String, SystemConsumer> sideInputConsumers =
-          createConsumers(containerSideInputSystems, systemFactories, config, this.samzaContainerMetrics.registry());
-
-      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false);
-
-      SystemConsumersMetrics sideInputSystemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX);
-      // we use the same registry as samza-container-metrics
-
-      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
-          sideInputSystemConsumersMetrics.registry(), systemAdmins);
-
-      ApplicationConfig applicationConfig = new ApplicationConfig(config);
-
-      sideInputSystemConsumers =
-          new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
-              sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
-              TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
-              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId());
-    }
-
-  }
-
-  /**
-   * Remove changeLogSSPs that are associated with standby tasks from changelogSSP map and only return changelogSSPs
-   * associated with the active tasks.
-   * The standby changelogs will be consumed and restored as side inputs.
-   *
-   * @param containerModel the container's model
-   * @param changelogSystemStreams the passed in set of changelogSystemStreams
-   * @return A map of changeLogSSP to storeName across all tasks, assuming no two stores have the same changelogSSP
-   */
-  @VisibleForTesting
-  Map<String, SystemStream> getActiveTaskChangelogSystemStreams(ContainerModel containerModel,
-      Map<String, SystemStream> changelogSystemStreams) {
-    if (MapUtils.invertMap(changelogSystemStreams).size() != changelogSystemStreams.size()) {
-      throw new SamzaException("Two stores cannot have the same changelog system-stream");
-    }
-
-    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
-    changelogSystemStreams.forEach((storeName, systemStream) ->
-        containerModel.getTasks().forEach((taskName, taskModel) ->
-            changelogSSPToStore.put(new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), storeName))
-    );
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
-        changelogSSPToStore.remove(ssp);
-      });
-    });
-
-    // changelogSystemStreams correspond only to active tasks (since those of standby-tasks moved to sideInputs above)
-    return MapUtils.invertMap(changelogSSPToStore).entrySet().stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getSystemStream()));
-  }
-
-  /**
-   * Fetch the side input stores. For active containers, the stores correspond to the side inputs and for standbys, they
-   * include the durable stores.
-   * @param containerModel the container's model
-   * @param sideInputSystemStreams the map of store to side input system streams
-   * @param changelogSystemStreams the map of store to changelog system streams
-   * @return A set of side input stores
-   */
-  @VisibleForTesting
-  Set<String> getSideInputStores(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, SystemStream> changelogSystemStreams) {
-    // add all the side input stores by default regardless of active vs standby
-    Set<String> sideInputStores = new HashSet<>(sideInputSystemStreams.keySet());
-
-    // In case of standby tasks, we treat the stores that have changelogs as side input stores for bootstrapping state
-    if (getTasks(containerModel, TaskMode.Standby).size() > 0) {
-      sideInputStores.addAll(changelogSystemStreams.keySet());
-    }
-    return sideInputStores;
-  }
-
-  /**
-   * Add all sideInputs to a map of maps, indexed first by taskName, then by sideInput store name.
-   *
-   * @param containerModel the containerModel to use
-   * @param sideInputSystemStreams the map of store to sideInput system stream
-   * @param changelogSystemStreams the map of store to changelog system stream
-   * @return taskSideInputSSPs map
-   */
-  @VisibleForTesting
-  Map<TaskName, Map<String, Set<SystemStreamPartition>>> getTaskSideInputSSPs(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, SystemStream> changelogSystemStreams) {
-    Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = new HashMap<>();
-
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      sideInputSystemStreams.keySet().forEach(storeName -> {
-        Set<SystemStreamPartition> taskSideInputs = taskModel.getSystemStreamPartitions().stream().filter(ssp -> sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
-        taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
-      });
-    });
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
-        taskSideInputSSPs.get(taskName).put(storeName, Collections.singleton(ssp));
-      });
-    });
-
-    return taskSideInputSSPs;
-  }
-
-  /**
-   *  Creates SystemConsumer objects for store restoration, creating one consumer per system.
-   */
-  private static Map<String, SystemConsumer> createConsumers(Set<String> storeSystems,
-      Map<String, SystemFactory> systemFactories, Config config, MetricsRegistry registry) {
-    // Create one consumer for each system in use, map with one entry for each such system
-    Map<String, SystemConsumer> consumers = new HashMap<>();
-
-    // Iterate over the list of storeSystems and create one sysConsumer per system
-    for (String storeSystemName : storeSystems) {
-      SystemFactory systemFactory = systemFactories.get(storeSystemName);
-      if (systemFactory == null) {
-        throw new SamzaException("System " + storeSystemName + " does not exist in config");
-      }
-      consumers.put(storeSystemName, systemFactory.getConsumer(storeSystemName, config, registry));
-    }
-
-    return consumers;
-  }
-
-  private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, SystemStream> changelogSystemStreams,
-      Map<String, SystemConsumer> systemNameToSystemConsumers) {
-    // Map of each storeName to its respective systemConsumer
-    Map<String, SystemConsumer> storeConsumers = new HashMap<>();
-
-    // Populate the map of storeName to its relevant systemConsumer
-    for (String storeName : changelogSystemStreams.keySet()) {
-      storeConsumers.put(storeName, systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
-    }
-    return storeConsumers;
-  }
-
-  private Map<String, TaskRestoreManager> createTaskRestoreManagers(Map<String, StateBackendFactory> factories,
-      Map<String, Set<String>> backendFactoryStoreNames, Clock clock, SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
-      TaskModel taskModel) {
-    // Get the factories for the task based on the stores of the tasks to be restored from the factory
-    Map<String, TaskRestoreManager> backendFactoryRestoreManagers = new HashMap<>(); // backendFactoryName -> restoreManager
-    MetricsRegistry taskMetricsRegistry =
-        taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-    backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
-      StateBackendFactory factory = factories.get(factoryName);
-      if (factory == null) {
-        throw new SamzaException(
-            String.format("Required restore state backend factory: %s not found in configured factories %s",
-                factoryName, String.join(", ", factories.keySet())));
-      }
-      KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers,
-          inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes,
-          taskInstanceCollectors.get(taskName));
-      TaskRestoreManager restoreManager = factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor,
-          taskMetricsRegistry, storeNames, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
-          kafkaChangelogRestoreParams);
-
-      backendFactoryRestoreManagers.put(factoryName, restoreManager);
-    });
-    samzaContainerMetrics.addStoresRestorationGauge(taskName);
-    return backendFactoryRestoreManagers;
-  }
-
-  /**
-   * Return a map of backend factory names to set of stores that should be restored using it
-   */
-  @VisibleForTesting
-  Map<String, Set<String>> getBackendFactoryStoreNames(Checkpoint checkpoint, Set<String> storeNames,
-      StorageConfig storageConfig) {
-    Map<String, Set<String>> backendFactoryStoreNames = new HashMap<>(); // backendFactoryName -> set(storeNames)
-
-    if (checkpoint != null && checkpoint.getVersion() == 1) {
-      // Only restore stores with changelog streams configured
-      Set<String> changelogStores = storeNames.stream()
-          .filter(storeName -> storageConfig.getChangelogStream(storeName).isPresent())
-          .collect(Collectors.toSet());
-      // Default to changelog backend factory when using checkpoint v1 for backwards compatibility
-      if (!changelogStores.isEmpty()) {
-        backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, changelogStores);
-      }
-      if (storeNames.size() > changelogStores.size()) {
-        Set<String> nonChangelogStores = storeNames.stream()
-            .filter(storeName -> !changelogStores.contains(storeName))
-            .collect(Collectors.toSet());
-        LOG.info("non-Side input stores: {}, do not have a configured store changelogs for checkpoint V1,"
-                + "restore for the store will be skipped",
-            nonChangelogStores);
-      }
-    } else if (checkpoint == null ||  checkpoint.getVersion() == 2) {
-      // Extract the state checkpoint markers if checkpoint exists
-      Map<String, Map<String, String>> stateCheckpointMarkers = checkpoint == null ? Collections.emptyMap() :
-          ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
-
-      // Find stores associated to each state backend factory
-      storeNames.forEach(storeName -> {
-        List<String> storeFactories = storageConfig.getStoreRestoreFactories(storeName);
-
-        if (storeFactories.isEmpty()) {
-          // If the restore factory is not configured for the store and the store does not have a changelog topic
-          LOG.info("non-Side input store: {}, does not have a configured restore factories nor store changelogs,"
-                  + "restore for the store will be skipped",
-              storeName);
-        } else {
-          // Search the ordered list for the first matched state backend factory in the checkpoint
-          // If the checkpoint does not exist or state checkpoint markers does not exist, we match the first configured
-          // restore manager
-          Optional<String> factoryNameOpt = storeFactories.stream()
-              .filter(factoryName -> stateCheckpointMarkers.containsKey(factoryName) &&
-                  stateCheckpointMarkers.get(factoryName).containsKey(storeName))
-              .findFirst();
-          String factoryName;
-          if (factoryNameOpt.isPresent()) {
-            factoryName = factoryNameOpt.get();
-          } else { // Restore factories configured but no checkpoints found
-            // Use first configured restore factory
-            factoryName = storeFactories.get(0);
-            LOG.warn("No matching checkpoints found for configured factories: {}, " +
-                "defaulting to using the first configured factory with no checkpoints", storeFactories);
-          }
-          if (!backendFactoryStoreNames.containsKey(factoryName)) {
-            backendFactoryStoreNames.put(factoryName, new HashSet<>());
-          }
-          backendFactoryStoreNames.get(factoryName).add(storeName);
-        }
-      });
-    } else {
-      throw new SamzaException(String.format("Unsupported checkpoint version %s", checkpoint.getVersion()));
-    }
-    return backendFactoryStoreNames;
-  }
-
-  // Helper method to filter active Tasks from the container model
-  private static Map<TaskName, TaskModel> getTasks(ContainerModel containerModel, TaskMode taskMode) {
-    return containerModel.getTasks().entrySet().stream()
-        .filter(x -> x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
 
-  /**
-   * Create taskStores for all stores in storesToCreate.
-   * The store mode is chosen as read-write mode.
-   */
-  private Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<String> storesToCreate,
-      ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories, Map<String, Serde<Object>> serdes,
-      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
-      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
-    Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
-    StorageConfig storageConfig = new StorageConfig(config);
-
-    // iterate over each task and each storeName
-    for (Map.Entry<TaskName, TaskModel> task : containerModel.getTasks().entrySet()) {
-      TaskName taskName = task.getKey();
-      TaskModel taskModel = task.getValue();
-      if (!taskStores.containsKey(taskName)) {
-        taskStores.put(taskName, new HashMap<>());
-      }
-
-      for (String storeName : storesToCreate) {
-        List<String> storeBackupManagers = storageConfig.getStoreBackupFactories(storeName);
-        // A store is considered durable if it is backed by a changelog or another backupManager factory
-        boolean isDurable = changelogSystemStreams.containsKey(storeName) || !storeBackupManagers.isEmpty();
-        boolean isSideInput = this.sideInputStoreNames.contains(storeName);
-        // Use the logged-store-base-directory for change logged stores and sideInput stores, and non-logged-store-base-dir
-        // for non logged stores
-        File storeBaseDir = isDurable || isSideInput ? this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
-        File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName,
-            taskModel.getTaskMode());
-        this.storeDirectoryPaths.add(storeDirectory.toPath());
-
-        // if taskInstanceMetrics are specified use those for store metrics,
-        // otherwise (in case of StorageRecovery) use a blank MetricsRegistryMap
-        MetricsRegistry storeMetricsRegistry =
-            taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-        StorageEngine storageEngine =
-            createStore(storeName, storeDirectory, taskModel, jobContext, containerContext, storageEngineFactories,
-                serdes, storeMetricsRegistry, taskInstanceCollectors.get(taskName),
-                StorageEngineFactory.StoreMode.ReadWrite, this.changelogSystemStreams, this.config);
-
-        // add created store to map
-        taskStores.get(taskName).put(storeName, storageEngine);
-
-        LOG.info("Created task store {} in read-write mode for task {} in path {}", storeName, taskName, storeDirectory.getAbsolutePath());
-      }
-    }
-    return taskStores;
-  }
-
-  /**
-   * Method to instantiate a StorageEngine with the given parameters, and populate the storeDirectory paths (used to monitor
-   * disk space).
-   */
-  public static StorageEngine createStore(
-      String storeName,
-      File storeDirectory,
-      TaskModel taskModel,
-      JobContext jobContext,
-      ContainerContext containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Map<String, Serde<Object>> serdes,
-      MetricsRegistry storeMetricsRegistry,
-      MessageCollector messageCollector,
-      StorageEngineFactory.StoreMode storeMode,
-      Map<String, SystemStream> changelogSystemStreams,
-      Config config) {
-
-    StorageConfig storageConfig = new StorageConfig(config);
-    SystemStreamPartition changeLogSystemStreamPartition = changelogSystemStreams.containsKey(storeName) ?
-        new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()) : null;
-
-    Optional<String> storageKeySerde = storageConfig.getStorageKeySerde(storeName);
-    Serde keySerde = null;
-    if (storageKeySerde.isPresent()) {
-      keySerde = serdes.get(storageKeySerde.get());
-    }
-    Optional<String> storageMsgSerde = storageConfig.getStorageMsgSerde(storeName);
-    Serde messageSerde = null;
-    if (storageMsgSerde.isPresent()) {
-      messageSerde = serdes.get(storageMsgSerde.get());
-    }
-
-    return storageEngineFactories.get(storeName)
-        .getStorageEngine(storeName, storeDirectory, keySerde, messageSerde, messageCollector,
-            storeMetricsRegistry, changeLogSystemStreamPartition, jobContext, containerContext, storeMode);
-  }
-
-
-  // Create sideInput store processors, one per store per task
-  private Map<TaskName, Map<String, SideInputsProcessor>> createSideInputProcessors(StorageConfig config,
-      ContainerModel containerModel, Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
-
-    Map<TaskName, Map<String, SideInputsProcessor>> sideInputStoresToProcessors = new HashMap<>();
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      sideInputStoresToProcessors.put(taskName, new HashMap<>());
-      TaskMode taskMode = taskModel.getTaskMode();
-
-      for (String storeName : this.taskSideInputStoreSSPs.get(taskName).keySet()) {
-
-        SideInputsProcessor sideInputsProcessor;
-        Optional<String> sideInputsProcessorSerializedInstance =
-            config.getSideInputsProcessorSerializedInstance(storeName);
-
-        if (sideInputsProcessorSerializedInstance.isPresent()) {
-
-          sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get());
-          LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName);
-
-        } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) {
-          String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get();
-          SideInputsProcessorFactory sideInputsProcessorFactory =
-              ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class);
-          sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
-          LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);
-
-        } else {
-          // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy
-
-          // if this is a standby-task and the store is a non-side-input changelog store
-          // we creating identity sideInputProcessor for stores of standbyTasks
-          // have to use the right serde because the sideInput stores are created
-
-          Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
-          Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
-          sideInputsProcessor = new SideInputsProcessor() {
-            @Override
-            public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
-              // Ignore message if the key is null
-              if (message.getKey() == null) {
-                return ImmutableList.of();
-              } else {
-                // Skip serde if the message is null
-                return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
-                    message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage())));
-              }
-            }
-          };
-          LOG.info("Using identity side-inputs-processor for store: {}, task: {}", storeName, taskName);
-        }
-
-        sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor);
-      }
-    });
-
-    return sideInputStoresToProcessors;
-  }
-
-  // Create task sideInput storage managers, one per task, index by the SSP they are responsible for consuming
-  private Map<SystemStreamPartition, TaskSideInputHandler> createSideInputHandlers(Clock clock) {
-    // creating sideInput store processors, one per store per task
-    Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
-        createSideInputProcessors(new StorageConfig(config), this.containerModel, this.taskInstanceMetrics);
-
-    Map<SystemStreamPartition, TaskSideInputHandler> handlers = new HashMap<>();
-
-    if (this.hasSideInputs) {
-      containerModel.getTasks().forEach((taskName, taskModel) -> {
-
-        Map<String, StorageEngine> taskSideInputStores = sideInputStores.get(taskName);
-        Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new HashMap<>();
-        boolean taskHasSideInputs = false;
-        for (String storeName : taskSideInputStores.keySet()) {
-          Set<SystemStreamPartition> storeSSPs = this.taskSideInputStoreSSPs.get(taskName).get(storeName);
-          taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty();
-          sideInputStoresToSSPs.put(storeName, storeSSPs);
-        }
-
-        if (taskHasSideInputs) {
-          CountDownLatch taskCountDownLatch = new CountDownLatch(1);
-          this.sideInputTaskLatches.put(taskName, taskCountDownLatch);
-
-          TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(taskName,
-              taskModel.getTaskMode(),
-              loggedStoreBaseDirectory,
-              taskSideInputStores,
-              sideInputStoresToSSPs,
-              taskSideInputProcessors.get(taskName),
-              this.systemAdmins,
-              this.streamMetadataCache,
-              taskCountDownLatch,
-              clock);
-
-          sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
-            handlers.put(ssp, taskSideInputHandler);
-          });
-
-          LOG.info("Created TaskSideInputHandler for task {}, taskSideInputStores {} and loggedStoreBaseDirectory {}",
-              taskName, taskSideInputStores, loggedStoreBaseDirectory);
-        }
-      });
-    }
-    return handlers;
-  }
-
-  private Set<TaskSideInputHandler> getSideInputHandlers() {
-    return this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
-  }
 
   public void start() throws SamzaException, InterruptedException {
-    // Restores and recreates
+    // Restores and recreates stores.
     restoreStores();
+
     // Shutdown restore executor since it will no longer be used
     try {
       restoreExecutor.shutdown();
       if (restoreExecutor.awaitTermination(RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.MILLISECONDS)) {
         restoreExecutor.shutdownNow();
       }
     } catch (Exception e) {
-      LOG.error(e.getMessage());
-    }
-    if (this.hasSideInputs) {
-      startSideInputs();
+      LOG.error("Error shutting down restore executor", e);
     }
+
+    // create and restore side input stores
+    this.sideInputsManager = new SideInputsManager(
+        sideInputSystemStreams, systemFactories, activeTaskChangelogSystemStreams, storageEngineFactories, storeDirectoryPaths, containerModel, jobContext, containerContext, samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, streamMetadataCache, systemAdmins, serdeManager,

Review Comment:
   shouldn't `activeTaskChangelogSystemStreams` be replaced by `changelogSystemStreams` passed in the constructor of `ContainerStorageManager`? 
   
   e.g., 
   ```
     */
     @VisibleForTesting
     Map<TaskName, Map<String, Set<SystemStreamPartition>>> getTaskSideInputSSPs(ContainerModel containerModel,
         Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, SystemStream> changelogSystemStreams) {
   ```
   method used in CSM prior to refactor which has the caller arguments
   ```this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, sideInputSystemStreams, changelogSystemStreams);```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1655: Refactored ContainerStorageManager for readability and manageability

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1655:
URL: https://github.com/apache/samza/pull/1655#discussion_r1129791438


##########
samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java:
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import scala.collection.JavaConversions;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.RunLoop;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.system.SystemConsumersMetrics;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.chooser.DefaultChooser;
+import org.apache.samza.system.chooser.MessageChooser;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.task.TaskInstanceCollector;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SideInputsManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SideInputsManager.class);
+
+  private static final String SIDE_INPUTS_THREAD_NAME = "SideInputs Thread";
+  // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer
+  private static final String SIDE_INPUTS_METRICS_PREFIX = "side-inputs-";
+
+  // Timeout with which sideinput thread checks for exceptions and for whether SSPs as caught up
+  private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10;
+  private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60;
+  private static final int DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR = 1;
+
+  private final SamzaContainerMetrics samzaContainerMetrics;
+  private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
+  private final Config config;
+
+  /* Sideinput related parameters */
+  private final boolean hasSideInputs;
+  private final Map<TaskName, Map<String, StorageEngine>> sideInputStores;
+  // side inputs indexed first by task, then store name
+  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs;
+  private final Set<String> sideInputStoreNames;
+  private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers;
+  private SystemConsumers sideInputSystemConsumers;
+
+  // Used by the sideInput-read thread to signal to the main thread. Map's contents are mutated.
+  private final Map<TaskName, CountDownLatch> sideInputTaskLatches;
+  private final ExecutorService sideInputsExecutor = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDE_INPUTS_THREAD_NAME).build());
+  private RunLoop sideInputRunLoop; // created in start()
+
+  private volatile boolean shouldShutdown = false;
+  private volatile Throwable sideInputException = null;
+
+  public SideInputsManager(Map<String, Set<SystemStream>> sideInputSystemStreams,
+      Map<String, SystemFactory> systemFactories,
+      Map<String, SystemStream> changelogSystemStreams,
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Set<Path> storeDirectoryPaths,
+      ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext,
+      SamzaContainerMetrics samzaContainerMetrics,
+      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
+      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
+      StreamMetadataCache streamMetadataCache,
+      SystemAdmins systemAdmins,
+      SerdeManager serdeManager, Map<String, Serde<Object>> serdes,
+      StorageManagerUtil storageManagerUtil,
+      File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
+      Config config, Clock clock) {
+    this.taskSideInputStoreSSPs = getTaskSideInputSSPs(sideInputSystemStreams, changelogSystemStreams, containerModel);
+    this.sideInputStoreNames = ContainerStorageManagerUtil.getSideInputStoreNames(
+        sideInputSystemStreams, changelogSystemStreams, containerModel);
+    this.sideInputTaskLatches = new HashMap<>();
+    this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(m -> m.values().stream())
+        .flatMap(Collection::stream)
+        .findAny()
+        .isPresent();
+
+    this.taskInstanceMetrics = taskInstanceMetrics;
+    this.samzaContainerMetrics = samzaContainerMetrics;
+    this.config = config;
+
+    // create side input taskStores for all tasks in the containerModel and each store in storageEngineFactories
+    this.sideInputStores = ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, storageEngineFactories, sideInputStoreNames, changelogSystemStreams, storeDirectoryPaths, containerModel,
+        jobContext, containerContext, serdes, taskInstanceMetrics, taskInstanceCollectors,
+        storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
+
+    this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, sideInputStores, taskSideInputStoreSSPs, sideInputTaskLatches, taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
+    );
+
+    // create SystemConsumers for consuming from taskSideInputSSPs, if sideInputs are being used
+    if (this.hasSideInputs) {
+      Set<SystemStream> containerSideInputSystemStreams = this.taskSideInputStoreSSPs.values().stream()
+          .flatMap(map -> map.values().stream())
+          .flatMap(Set::stream)
+          .map(SystemStreamPartition::getSystemStream)
+          .collect(Collectors.toSet());
+
+      Set<String> containerSideInputSystems = containerSideInputSystemStreams.stream()
+          .map(SystemStream::getSystem)
+          .collect(Collectors.toSet());
+
+      // create sideInput consumers indexed by systemName
+      // Mapping from storeSystemNames to SystemConsumers
+      Map<String, SystemConsumer> sideInputConsumers =
+          ContainerStorageManagerUtil.createSystemConsumers(containerSideInputSystems, systemFactories,
+              samzaContainerMetrics.registry(), config);
+
+      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata =
+          streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(), false);
+
+      // we use the same registry as samza-container-metrics
+      SystemConsumersMetrics sideInputSystemConsumersMetrics =
+          new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);
+
+      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config,
+          sideInputSystemConsumersMetrics.registry(), systemAdmins);
+
+      ApplicationConfig applicationConfig = new ApplicationConfig(config);
+
+      this.sideInputSystemConsumers =
+          new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
+              sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
+              SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              TaskConfig.DEFAULT_POLL_INTERVAL_MS, ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
+              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, applicationConfig.getRunId());
+    }
+  }
+
+  // read sideInputs until all sideInputStreams are caught up, then return
+  public void start() {
+    if (this.hasSideInputs) {
+      LOG.info("SideInput Restore started");
+
+      // initialize the sideInputStorageManagers
+      this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::init);
+
+      Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = this.sspSideInputHandlers.values().stream()
+          .distinct()
+          .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity()));
+
+      Map<TaskName, TaskInstanceMetrics> sideInputTaskMetrics = new HashMap<>();
+      Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+      this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        Set<SystemStreamPartition> taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
+
+        if (!taskSSPs.isEmpty()) {
+          String sideInputSource = SIDE_INPUTS_METRICS_PREFIX + this.taskInstanceMetrics.get(taskName).source();
+          TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics(
+              sideInputSource, this.taskInstanceMetrics.get(taskName).registry(), SIDE_INPUTS_METRICS_PREFIX);
+          sideInputTaskMetrics.put(taskName, sideInputMetrics);
+
+          RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
+              taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+          sideInputTasks.put(taskName, sideInputTask);
+        }
+      });
+
+      // register all sideInput SSPs with the consumers
+      for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
+        String startingOffset = this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp);
+
+        if (startingOffset == null) {
+          throw new SamzaException(
+              "No starting offset could be obtained for SideInput SystemStreamPartition : " + ssp + ". Consumer cannot start.");
+        }
+
+        // register startingOffset with the sysConsumer and register a metric for it
+        sideInputSystemConsumers.register(ssp, startingOffset);
+        taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+            ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+        sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+            ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+      }
+
+      // start the systemConsumers for consuming input
+      this.sideInputSystemConsumers.start();
+
+      TaskConfig taskConfig = new TaskConfig(this.config);
+      SamzaContainerMetrics sideInputContainerMetrics =
+          new SamzaContainerMetrics(SIDE_INPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(),
+              this.samzaContainerMetrics.registry(), SIDE_INPUTS_METRICS_PREFIX);
+
+      final ApplicationConfig applicationConfig = new ApplicationConfig(config);
+
+      this.sideInputRunLoop = new RunLoop(sideInputTasks,
+          null, // all operations are executed in the main runloop thread
+          this.sideInputSystemConsumers,
+          1, // single message in flight per task
+          -1, // no windowing
+          taskConfig.getCommitMs(),
+          taskConfig.getCallbackTimeoutMs(),
+          taskConfig.getDrainCallbackTimeoutMs(),
+          // TODO consolidate these container configs SAMZA-2275
+          this.config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)),
+          taskConfig.getMaxIdleMs(),
+          sideInputContainerMetrics,
+          System::nanoTime,
+          false,
+          DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR,
+          applicationConfig.getRunId(),
+          ApplicationUtil.isHighLevelApiJob(config)
+      ); // commit must be synchronous to ensure integrity of state flush
+
+      try {
+        sideInputsExecutor.submit(() -> {
+          try {
+            sideInputRunLoop.run();
+          } catch (Exception e) {
+            LOG.error("Exception in reading sideInputs", e);
+            sideInputException = e;
+          }
+        });
+
+        // Make the main thread wait until all sideInputs have been caughtup or an exception was thrown
+        while (!shouldShutdown && sideInputException == null &&
+            !awaitSideInputTasks(sideInputTaskLatches)) {
+          LOG.debug("Waiting for SideInput bootstrap to complete");
+        }
+
+        if (sideInputException != null) { // Throw exception if there was an exception in catching-up sideInputs
+          throw new SamzaException("Exception in restoring sideInputs", sideInputException);
+        }
+
+      } catch (InterruptedException e) {
+        LOG.warn("Received an interrupt during side inputs store restoration."
+            + " Exiting prematurely without completing store restore.");
+        /*
+         * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
+         * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
+         * resources prematurely here.
+         */
+        shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
+        throw new SamzaException("Side inputs read was interrupted", e);
+      }
+
+      LOG.info("SideInput Restore complete");
+    }
+  }
+
+  public Map<TaskName, Map<String, StorageEngine>> getSideInputStores() {
+    return ImmutableMap.copyOf(this.sideInputStores);
+  }
+
+  public void shutdown() {
+    // stop all side input consumers and stores

Review Comment:
   Fixed, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on pull request #1655: Refactored ContainerStorageManager for readability and manageability

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on PR #1655:
URL: https://github.com/apache/samza/pull/1655#issuecomment-1457656110

   cc @bharathkk to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org