You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/11/13 01:19:10 UTC

[GitHub] [samza] rmatharu commented on a change in pull request #1218: Samza 2382: User specified side input processor should take precedence over the identity processor

rmatharu commented on a change in pull request #1218: Samza 2382: User specified side input processor should take precedence over the identity processor
URL: https://github.com/apache/samza/pull/1218#discussion_r345528437
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 ##########
 @@ -526,51 +526,57 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
 
     Map<TaskName, Map<String, SideInputsProcessor>> sideInputStoresToProcessors = new HashMap<>();
-    getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> {
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
         sideInputStoresToProcessors.put(taskName, new HashMap<>());
+        TaskMode taskMode = taskModel.getTaskMode();
+
         for (String storeName : sideInputSystemStreams.keySet()) {
+
+          SideInputsProcessor sideInputsProcessor;
           Optional<String> sideInputsProcessorSerializedInstance =
               config.getSideInputsProcessorSerializedInstance(storeName);
+
           if (sideInputsProcessorSerializedInstance.isPresent()) {
-            sideInputStoresToProcessors.get(taskName)
-                .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
-                    sideInputsProcessorSerializedInstance.get()));
-          } else {
-            String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName)
-                .orElseThrow(() -> new SamzaException(
-                    String.format("Could not find sideInputs processor factory for store: %s", storeName)));
+
+            sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get());
+            LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName);
+
+          } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) {
+            String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get();
             SideInputsProcessorFactory sideInputsProcessorFactory =
                 ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class);
-            SideInputsProcessor sideInputsProcessor =
-                sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
-            sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor);
-          }
-        }
-      });
+            sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
+            LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);
 
-    // creating identity sideInputProcessor for stores of standbyTasks
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
-        sideInputStoresToProcessors.put(taskName, new HashMap<>());
-        for (String storeName : sideInputSystemStreams.keySet()) {
+          } else if (!config.getSideInputsProcessorFactory(storeName).isPresent() && taskMode.equals(TaskMode.Active)) {
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services