You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/05/22 01:34:40 UTC
[samza] branch master updated: SAMZA-2209: Explicitly handle empty
Optionals in ContainerStorageManager when using StorageConfig (#1045)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f847b8d SAMZA-2209: Explicitly handle empty Optionals in ContainerStorageManager when using StorageConfig (#1045)
f847b8d is described below
commit f847b8df1b32ff80ac059a310de7c10330cc4d27
Author: cameronlee314 <37...@users.noreply.github.com>
AuthorDate: Tue May 21 18:34:36 2019 -0700
SAMZA-2209: Explicitly handle empty Optionals in ContainerStorageManager when using StorageConfig (#1045)
---
.../org/apache/samza/storage/ContainerStorageManager.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 85e10ea..7e802bd 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -503,10 +503,12 @@ public class ContainerStorageManager {
.put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
sideInputsProcessorSerializedInstance.get()));
} else {
+ String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName)
+ .orElseThrow(() -> new SamzaException(
+ String.format("Could not find side inputs processor factory for store: %s", storeName)));
sideInputStoresToProcessors.get(taskName)
- .put(storeName, Util.getObj(config.getSideInputsProcessorFactory(storeName).get(),
- SideInputsProcessorFactory.class).getSideInputsProcessor(config,
- taskInstanceMetrics.get(taskName).registry()));
+ .put(storeName, Util.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class)
+ .getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry()));
}
}
});
@@ -517,8 +519,10 @@ public class ContainerStorageManager {
for (String storeName : sideInputSystemStreams.keySet()) {
// have to use the right serde because the sideInput stores are created
- Serde keySerde = serdes.get(config.getStorageKeySerde(storeName).get());
- Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName).get());
+ Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
+ .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
+ Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
+ .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
sideInputStoresToProcessors.get(taskName).put(storeName, new SideInputsProcessor() {
@Override
public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {