You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2019/11/13 21:55:42 UTC

[samza] branch 1.3.0 updated: SAMZA-2382: User specified side input processor should take precedence over the identity processor (#1218)

This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.3.0 by this push:
     new 44c791f  SAMZA-2382: User specified side input processor should take precedence over the identity processor (#1218)
44c791f is described below

commit 44c791fefd74f20470d2669e3fea46d6d3780a4a
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Tue Nov 12 20:15:59 2019 -0800

    SAMZA-2382: User specified side input processor should take precedence over the identity processor (#1218)
    
    User specified side input processor should take precedence over the identity processor for standby-tasks.
---
 .../samza/storage/ContainerStorageManager.java     | 77 ++++++++++++----------
 1 file changed, 41 insertions(+), 36 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 7b97e6f..afd3e69 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -526,51 +526,56 @@ public class ContainerStorageManager {
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
 
     Map<TaskName, Map<String, SideInputsProcessor>> sideInputStoresToProcessors = new HashMap<>();
-    getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> {
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
         sideInputStoresToProcessors.put(taskName, new HashMap<>());
+        TaskMode taskMode = taskModel.getTaskMode();
+
         for (String storeName : sideInputSystemStreams.keySet()) {
+
+          SideInputsProcessor sideInputsProcessor;
           Optional<String> sideInputsProcessorSerializedInstance =
               config.getSideInputsProcessorSerializedInstance(storeName);
+
           if (sideInputsProcessorSerializedInstance.isPresent()) {
-            sideInputStoresToProcessors.get(taskName)
-                .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
-                    sideInputsProcessorSerializedInstance.get()));
-          } else {
-            String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName)
-                .orElseThrow(() -> new SamzaException(
-                    String.format("Could not find sideInputs processor factory for store: %s", storeName)));
+
+            sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get());
+            LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName);
+
+          } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) {
+            String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get();
             SideInputsProcessorFactory sideInputsProcessorFactory =
                 ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class);
-            SideInputsProcessor sideInputsProcessor =
-                sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
-            sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor);
-          }
-        }
-      });
+            sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
+            LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);
 
-    // creating identity sideInputProcessor for stores of standbyTasks
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
-        sideInputStoresToProcessors.put(taskName, new HashMap<>());
-        for (String storeName : sideInputSystemStreams.keySet()) {
-
-          // have to use the right serde because the sideInput stores are created
-          Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
-          Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
-          sideInputStoresToProcessors.get(taskName).put(storeName, new SideInputsProcessor() {
-            @Override
-            public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
-              // Ignore message if the key is null
-              if (message.getKey() == null) {
-                return ImmutableList.of();
-              } else {
-                // Skip serde if the message is null
-                return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
-                    message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage())));
+          } else {
+            // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy
+
+            // if this is a standby-task and the store is a non-side-input changelog store
+            // we creating identity sideInputProcessor for stores of standbyTasks
+            // have to use the right serde because the sideInput stores are created
+
+            Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
+                .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
+            Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
+                .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
+            sideInputsProcessor = new SideInputsProcessor() {
+              @Override
+              public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
+                // Ignore message if the key is null
+                if (message.getKey() == null) {
+                  return ImmutableList.of();
+                } else {
+                  // Skip serde if the message is null
+                  return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
+                      message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage())));
+                }
               }
-            }
-          });
+            };
+            LOG.info("Using identity side-inputs-processor for store: {}, task: {}", storeName, taskName);
+          }
+
+          sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor);
         }
       });