You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/12/15 18:46:18 UTC

[samza] branch master updated: Fix restore factory not configured log (#1645)

This is an automated email from the ASF dual-hosted git repository.

dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new ad8af0c2d Fix restore factory not configured log (#1645)
ad8af0c2d is described below

commit ad8af0c2d431646c00a773c63e671d4560b573b3
Author: Daniel Chen <xr...@uwaterloo.ca>
AuthorDate: Thu Dec 15 10:46:12 2022 -0800

    Fix restore factory not configured log (#1645)
---
 .../main/scala/org/apache/samza/storage/ContainerStorageManager.java | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 326908535..e58b8c301 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -441,6 +441,11 @@ public class ContainerStorageManager {
 
     backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
       StateBackendFactory factory = factories.get(factoryName);
+      if (factory == null) {
+        throw new SamzaException(
+            String.format("Required restore state backend factory: %s not found in configured factories %s",
+                factoryName, String.join(", ", factories.keySet())));
+      }
       KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers,
           inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes,
           taskInstanceCollectors.get(taskName));