You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/05/22 01:34:40 UTC

[samza] branch master updated: SAMZA-2209: Explicitly handle empty Optionals in ContainerStorageManager when using StorageConfig (#1045)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f847b8d  SAMZA-2209: Explicitly handle empty Optionals in ContainerStorageManager when using StorageConfig (#1045)
f847b8d is described below

commit f847b8df1b32ff80ac059a310de7c10330cc4d27
Author: cameronlee314 <37...@users.noreply.github.com>
AuthorDate: Tue May 21 18:34:36 2019 -0700

    SAMZA-2209: Explicitly handle empty Optionals in ContainerStorageManager when using StorageConfig (#1045)
---
 .../org/apache/samza/storage/ContainerStorageManager.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 85e10ea..7e802bd 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -503,10 +503,12 @@ public class ContainerStorageManager {
                 .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
                     sideInputsProcessorSerializedInstance.get()));
           } else {
+            String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName)
+                .orElseThrow(() -> new SamzaException(
+                    String.format("Could not find side inputs processor factory for store: %s", storeName)));
             sideInputStoresToProcessors.get(taskName)
-                .put(storeName, Util.getObj(config.getSideInputsProcessorFactory(storeName).get(),
-                    SideInputsProcessorFactory.class).getSideInputsProcessor(config,
-                    taskInstanceMetrics.get(taskName).registry()));
+                .put(storeName, Util.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class)
+                    .getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry()));
           }
         }
       });
@@ -517,8 +519,10 @@ public class ContainerStorageManager {
         for (String storeName : sideInputSystemStreams.keySet()) {
 
           // have to use the right serde because the sideInput stores are created
-          Serde keySerde = serdes.get(config.getStorageKeySerde(storeName).get());
-          Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName).get());
+          Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
+              .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
+          Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
+              .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
           sideInputStoresToProcessors.get(taskName).put(storeName, new SideInputsProcessor() {
             @Override
             public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {