You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by dc...@apache.org on 2022/12/15 18:46:18 UTC
[samza] branch master updated: Fix restore factory not configured log (#1645)
This is an automated email from the ASF dual-hosted git repository.
dchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new ad8af0c2d Fix restore factory not configured log (#1645)
ad8af0c2d is described below
commit ad8af0c2d431646c00a773c63e671d4560b573b3
Author: Daniel Chen <xr...@uwaterloo.ca>
AuthorDate: Thu Dec 15 10:46:12 2022 -0800
Fix restore factory not configured log (#1645)
---
.../main/scala/org/apache/samza/storage/ContainerStorageManager.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 326908535..e58b8c301 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -441,6 +441,11 @@ public class ContainerStorageManager {
backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
StateBackendFactory factory = factories.get(factoryName);
+ if (factory == null) {
+ throw new SamzaException(
+ String.format("Required restore state backend factory: %s not found in configured factories %s",
+ factoryName, String.join(", ", factories.keySet())));
+ }
KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers,
inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes,
taskInstanceCollectors.get(taskName));