You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2019/11/13 21:55:42 UTC
[samza] branch 1.3.0 updated: SAMZA-2382: User specified side input
processor should take precedence over the identity processor (#1218)
This is an automated email from the ASF dual-hosted git repository.
lhaiesp pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.3.0 by this push:
new 44c791f SAMZA-2382: User specified side input processor should take precedence over the identity processor (#1218)
44c791f is described below
commit 44c791fefd74f20470d2669e3fea46d6d3780a4a
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Tue Nov 12 20:15:59 2019 -0800
SAMZA-2382: User specified side input processor should take precedence over the identity processor (#1218)
User specified side input processor should take precedence over the identity processor for standby-tasks.
---
.../samza/storage/ContainerStorageManager.java | 77 ++++++++++++----------
1 file changed, 41 insertions(+), 36 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 7b97e6f..afd3e69 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -526,51 +526,56 @@ public class ContainerStorageManager {
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
Map<TaskName, Map<String, SideInputsProcessor>> sideInputStoresToProcessors = new HashMap<>();
- getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> {
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
sideInputStoresToProcessors.put(taskName, new HashMap<>());
+ TaskMode taskMode = taskModel.getTaskMode();
+
for (String storeName : sideInputSystemStreams.keySet()) {
+
+ SideInputsProcessor sideInputsProcessor;
Optional<String> sideInputsProcessorSerializedInstance =
config.getSideInputsProcessorSerializedInstance(storeName);
+
if (sideInputsProcessorSerializedInstance.isPresent()) {
- sideInputStoresToProcessors.get(taskName)
- .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
- sideInputsProcessorSerializedInstance.get()));
- } else {
- String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName)
- .orElseThrow(() -> new SamzaException(
- String.format("Could not find sideInputs processor factory for store: %s", storeName)));
+
+ sideInputsProcessor = SerdeUtils.deserialize("Side Inputs Processor", sideInputsProcessorSerializedInstance.get());
+ LOG.info("Using serialized side-inputs-processor for store: {}, task: {}", storeName, taskName);
+
+ } else if (config.getSideInputsProcessorFactory(storeName).isPresent()) {
+ String sideInputsProcessorFactoryClassName = config.getSideInputsProcessorFactory(storeName).get();
SideInputsProcessorFactory sideInputsProcessorFactory =
ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, SideInputsProcessorFactory.class);
- SideInputsProcessor sideInputsProcessor =
- sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
- sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor);
- }
- }
- });
+ sideInputsProcessor = sideInputsProcessorFactory.getSideInputsProcessor(config, taskInstanceMetrics.get(taskName).registry());
+ LOG.info("Using side-inputs-processor from factory: {} for store: {}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), storeName, taskName);
- // creating identity sideInputProcessor for stores of standbyTasks
- getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
- sideInputStoresToProcessors.put(taskName, new HashMap<>());
- for (String storeName : sideInputSystemStreams.keySet()) {
-
- // have to use the right serde because the sideInput stores are created
- Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
- .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
- Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
- .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
- sideInputStoresToProcessors.get(taskName).put(storeName, new SideInputsProcessor() {
- @Override
- public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
- // Ignore message if the key is null
- if (message.getKey() == null) {
- return ImmutableList.of();
- } else {
- // Skip serde if the message is null
- return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
- message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage())));
+ } else {
+ // if this is a active-task with a side-input store but no sideinput-processor-factory defined in config, we rely on upstream validations to fail the deploy
+
+ // if this is a standby-task and the store is a non-side-input changelog store
+ // we creating identity sideInputProcessor for stores of standbyTasks
+ // have to use the right serde because the sideInput stores are created
+
+ Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
+ .orElseThrow(() -> new SamzaException("Could not find storage key serde for store: " + storeName)));
+ Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
+ .orElseThrow(() -> new SamzaException("Could not find storage msg serde for store: " + storeName)));
+ sideInputsProcessor = new SideInputsProcessor() {
+ @Override
+ public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, KeyValueStore store) {
+ // Ignore message if the key is null
+ if (message.getKey() == null) {
+ return ImmutableList.of();
+ } else {
+ // Skip serde if the message is null
+ return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
+ message.getMessage() == null ? null : msgSerde.fromBytes((byte[]) message.getMessage())));
+ }
}
- }
- });
+ };
+ LOG.info("Using identity side-inputs-processor for store: {}, task: {}", storeName, taskName);
+ }
+
+ sideInputStoresToProcessors.get(taskName).put(storeName, sideInputsProcessor);
}
});