You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/23 08:01:43 UTC

[GitHub] [samza] bkonold opened a new pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

bkonold opened a new pull request #1367:
URL: https://github.com/apache/samza/pull/1367


   **Issues**: `TaskSideInputStorageManager` conflates both processing and storage logic. This is problematic as adding support of transactional state in standby containers requires heavy additions to both. Refactoring is necessary in order to avoid `TaskSideInputStorageManager` from becoming a difficult to maintain behemoth (like `ContainerStorageManager` is now). After this change, introduction of support for transactional state in standby containers can evolve cleanly.
    
   **Changes**: Processing logic is moved out of `TaskSideInputStorageManager` and into a new class `TaskSideInputHandler`. This includes: coordination of oldestOffsets, lastProcessedOffsets, startingOffsets, and processing behavior for a given SSP envelope.
   
   Management of stores' StorageEngines remains in `TaskSideInputStorageManager` as it is today.
   
   `ContainerStorageManager` now interfaces only with `TaskSideInputHandler` to handle side input lifecycle, processing, and flush.
   
   There is **NO NEW** functionality added in this patch, only a refactor.
    
   **Tests**: Existing unit tests located in `TaskSideInputStorageManager` have been adapted to the class's new responsibilities, or moved and re-implemented for `TaskSideInputHandler`.
   
   **API Changes**: None. The classes touched are internal to Samza.
    
   **Upgrade Instructions**: None.
    
   **Usage Instructions**: None.
   
   **NOTES**: Since it is difficult to parse where new code is coming from when new classes are written, I've included comments / annotations across the PR to indicate where code that appears as "addition" was sourced from.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on pull request #1367:
URL: https://github.com/apache/samza/pull/1367#issuecomment-634450085


   > Nit: General observation, your refactored code seems to use this. in all methods whereas in samza I have often observed that this. is used only in the constructor as the inputs to the constructor maybe the same named vars. Not necessary to address. just curious why you chose this or maybe im wrong about my samza observations :)
   
   I have noticed a mix of both - though it seems that `this.` prefix is used more often in constructors that it is outside them. IMO, it is more readable to have the prefix than to not. However, any team-wide convention should take precedence, though I did not find anything on the OSS guide: https://samza.apache.org/contribute/coding-guide.html


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r429525357



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }

Review comment:
       copied from `TaskSideInputStorageManager`

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskSideInputHandler {
+  private static final String TEST_TASK_NAME = "test-task";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STORE = "test-store";
+  private static final String TEST_STREAM = "test-stream";
+
+    /**
+   * This test is for cases, when calls to systemAdmin (e.g., KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, "1", "2")));
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStreamMetadata(Collections.singletonMap(new SystemStream(TEST_SYSTEM, TEST_STREAM),
+            new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+        .addStore(TEST_STORE, ssps)
+        .build();
+
+    handler.init();
+
+    ssps.forEach(ssp -> {
+        String startingOffset = handler.getStartingOffset(
+            new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStore(storeName, ssps)
+        .build();
+
+    // set up file and oldest offsets. for even partitions, fileOffsets will be larger; for odd partitions oldestOffsets will be larger
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(handler).getStartingOffsets(fileOffsets, oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets = handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5);
+    startingOffsets.forEach((ssp, offset) -> {
+        int partitionId = ssp.getPartition().getPartitionId();
+        String expectedOffset = partitionId % 2 == 0
+            // 1 + fileOffset
+            ? getOffsetAfter(String.valueOf(ssp.getPartition().getPartitionId() + 10))
+            // oldestOffset
+            : String.valueOf(ssp.getPartition().getPartitionId() + 10);
+        assertEquals("Larger of fileOffsets and oldestOffsets should always be chosen", expectedOffset, offset);
+      });

Review comment:
       New - added for stricter behavior check on `getStartingOffsets`

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }
+
+  public void flush() {
+    this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+  }
+
+  public String getStartingOffset(SystemStreamPartition ssp) {
+    return this.startingOffsets.get(ssp);
+  }
+
+  public String getLastProcessedOffset(SystemStreamPartition ssp) {
+    return this.lastProcessedOffsets.get(ssp);
+  }
+
+  public void stop() {
+    this.taskSideInputStorageManager.stop(this.lastProcessedOffsets);
+  }
+
+  /**
+   * Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores.
+   * If the local file offset is available and is greater than the oldest available offset from source, uses it,
+   * else falls back to oldest offset in the source.
+   *
+   * @param fileOffsets offsets from the local offset file
+   * @param oldestOffsets oldest offsets from the source
+   * @return a {@link Map} of {@link SystemStreamPartition} to offset
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getStartingOffsets(
+      Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) {
+    Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
+
+    this.sspToStores.keySet().forEach(ssp -> {
+        String fileOffset = fileOffsets.get(ssp);
+        String oldestOffset = oldestOffsets.get(ssp);
+
+        startingOffsets.put(ssp,
+            this.storageManagerUtil.getStartingOffset(
+                ssp, this.systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, oldestOffset));
+      });
+
+    return startingOffsets;
+  }
+
+  /**
+   * Gets the oldest offset for the {@link SystemStreamPartition}s associated with all the store side inputs.
+   *   1. Groups the list of the SSPs based on system stream
+   *   2. Fetches the {@link SystemStreamMetadata} from {@link StreamMetadataCache}
+   *   3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
+   *      and populates the oldest offset for SSPs belonging to the system stream.
+   *
+   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. If partitionMetadata could not be
+   * obtained for any {@link SystemStreamPartition} the offset for it is populated as null.
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getOldestOffsets() {
+    Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+
+    // Step 1
+    Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = this.sspToStores.keySet().stream()
+        .collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
+
+    // Step 2
+    Map<SystemStream, SystemStreamMetadata> metadata = JavaConverters.mapAsJavaMapConverter(
+        streamMetadataCache.getStreamMetadata(
+            JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(), false)).asJava();
+
+    // Step 3
+    metadata.forEach((systemStream, systemStreamMetadata) -> {
+
+        // get the partition metadata for each system stream
+        Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
+            systemStreamMetadata.getSystemStreamPartitionMetadata();
+
+        // For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
+        // if partitionMetadata was not obtained for any SSP, populate oldest-offset as null
+        // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using lambda will NPE when getOldestOffset() is null
+        for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+          oldestOffsets.put(ssp, partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+        }
+      });
+
+    return oldestOffsets;
+  }
+
+  private void validateProcessorConfiguration() {
+    Set<String> stores = this.sspToStores.values().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toSet());
+
+    stores.forEach(storeName -> {
+        if (!storeToProcessor.containsKey(storeName)) {
+          throw new SamzaException(
+              String.format("Side inputs processor missing for store: %s.", storeName));
+        }
+      });
+  }
+}

Review comment:
       copied from `TaskSideInputStorageManager`

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskSideInputHandler {
+  private static final String TEST_TASK_NAME = "test-task";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STORE = "test-store";
+  private static final String TEST_STREAM = "test-stream";
+
+    /**
+   * This test is for cases, when calls to systemAdmin (e.g., KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, "1", "2")));
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStreamMetadata(Collections.singletonMap(new SystemStream(TEST_SYSTEM, TEST_STREAM),
+            new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+        .addStore(TEST_STORE, ssps)
+        .build();
+
+    handler.init();
+
+    ssps.forEach(ssp -> {
+        String startingOffset = handler.getStartingOffset(
+            new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStore(storeName, ssps)
+        .build();
+
+    // set up file and oldest offsets. for even partitions, fileOffsets will be larger; for odd partitions oldestOffsets will be larger
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(handler).getStartingOffsets(fileOffsets, oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets = handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5);

Review comment:
       adapted from `TaskSideInputStorageManager`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r435042186



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -714,15 +729,15 @@ private void startSideInputs() {
     LOG.info("SideInput Restore started");
 
     // initialize the sideInputStorageManagers
-    getSideInputStorageManagers().forEach(sideInputStorageManager -> sideInputStorageManager.init());
+    getSideInputHandlers().forEach(handler -> handler.init());

Review comment:
       minor: prefer to use method reference. 
   if you choose to, please do in the rest of places as well.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+

Review comment:
       Prefer to keep the old way of initializing this
   ```
   this.sspsToStores = new HashMap<>();	  
       storesToSSPs.forEach((store, ssps) -> {	  
           for (SystemStreamPartition ssp: ssps) {	    
             sspsToStores.computeIfAbsent(ssp, key -> new HashSet<>());	
             sspsToStores.computeIfPresent(ssp, (key, value) -> {	
                 value.add(store);	
                 return value;	
               });	
           }	
         });
   ```
   as its much more readable and simpler. 

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  /**
+   * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+   *
+   * @return the task name for this handler
+   */
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  /**
+   * Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP.
+   */
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets();
+    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
+
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+    LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+  }
+
+  /**
+   * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
+   * Synchronized inorder to be exclusive with flush().
+   *
+   * @param envelope incoming envelope to be processed
+   */
+  public synchronized void process(IncomingMessageEnvelope envelope) {

Review comment:
       Can we synchronize on a specific block or just make lastProcessedOffsets a thread safe data structure? 
   If no to both, can you add a documentation to clarify that?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -258,9 +142,10 @@ private void initializeStoreDirectories() {
   /**
    * Writes the offset files for all side input stores one by one. There is one offset file per store.
    * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum.
+   *
+   * @param lastProcessedOffsets The offset per SSP to write
    */
-  @VisibleForTesting
-  void writeOffsetFiles() {
+  public void writeOffsetFiles(Map<SystemStreamPartition, String> lastProcessedOffsets) {

Review comment:
       nit: now that you are changing the signature anyways, can we rename this to `writeFileOffsets` so that is consistent with `getFileOffsets`.
   
   what do you think?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  /**
+   * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+   *
+   * @return the task name for this handler
+   */
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  /**
+   * Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP.
+   */
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets();
+    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
+
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+    LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+  }
+
+  /**
+   * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
+   * Synchronized inorder to be exclusive with flush().
+   *
+   * @param envelope incoming envelope to be processed
+   */
+  public synchronized void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }
+
+  /**
+   * Flushes the underlying {@link TaskSideInputStorageManager}
+   * Synchronized inorder to be exclusive with process()
+   */
+  public synchronized void flush() {
+    this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+  }
+
+  /**
+   * Gets the starting offset for the given side input {@link SystemStreamPartition}.
+   *
+   * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
+   * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET} configurations. It will use the local offset
+   * file if it is valid, else it will fall back to oldest offset in the stream.
+   *
+   * @param ssp side input system stream partition to get the starting offset for
+   * @return the starting offset
+   */
+  public String getStartingOffset(SystemStreamPartition ssp) {
+    return this.startingOffsets.get(ssp);
+  }
+
+  /**
+   * Gets the last processed offset for the given side input {@link SystemStreamPartition}.
+   *
+   * @param ssp side input system stream partition to get the last processed offset for
+   * @return the last processed offset
+   */
+  public String getLastProcessedOffset(SystemStreamPartition ssp) {
+    return this.lastProcessedOffsets.get(ssp);
+  }
+
+  /**
+   * Stops the underlying storage manager at the last processed offsets.
+   */
+  public void stop() {

Review comment:
       I might have forgotten the need for synchronization between flush & process. If it is related to `lastProcessedOffsets` then we may have to do it in other places too.
   
   Or, if it is related to access to `taskSideInputStorageManager` because of lack of thread safety of that class.
   Either way, `stop` looks like it requires synchronization.
   

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;

Review comment:
       replace with actual imports.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -279,13 +164,12 @@ void writeOffsetFiles() {
   }
 
   /**
-   * Gets the side input SSP offsets for all stores from their local offset files.
+   * Gets the side input SSP offsets for all stores from their local offset files. This method should be executed only
+   * once at class initialization.

Review comment:
       what happens if it is executed after initialization? 
   If you really expect this to be only executed once before init, should we have a precondition check and throw exception to explicitly warn about the consequence of this being invoked multiple times?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -329,18 +345,14 @@ public ContainerStorageManager(
   /**
    *  Creates SystemConsumer objects for store restoration, creating one consumer per system.
    */
-  private static Map<String, SystemConsumer> createConsumers(Map<String, Set<SystemStream>> systemStreams,
+  private static Map<String, SystemConsumer> createConsumers(Set<String> systems,

Review comment:
       can we give a meaningful name instead of systems? I see the call site passes `containerSideInputSystems` and `containerChangelogSystems`. so maybe keep `storeSystems` or anything that conveys more information than systems :)

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {

Review comment:
       You have single line parameter pattern here but modified from the single line pattern of constructor to few lines of constructor in the storage manager. 
   I'd prefer this to also follow the suit of `TaskSideInputStorageManager` since that is concise and having a breakdown of parameter per line doesn't add too much value in constructor.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();

Review comment:
       suggest moving this to the top of the constructor with taking in the stores as parameter to fail fast and prevent initializations.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;

Review comment:
       nit: why is separate from the other private fields. If the intention is the separate private final fields that are initialized here, then should `lastProcessedOffset` moved here?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);

Review comment:
       why are we constructing the storage manager here? why not have the CSM pass the constructed storage manager to the handler?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r430861393



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();

Review comment:
       This is a new validation added? 

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {

Review comment:
       Major: I believe this was supposed to be "synchronized" so that it is exclusive with flush method. I see that sync is dropped in both the methods for the TaskSideInputHandler but is present in the manager.flush.. 
   wont it lead to process in handler and flush in manager being parallel?
   is there some change that makes it okay for them to be not exclusive? If so, please call it out. 

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -120,35 +76,32 @@ public void init() {
     Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
     LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
 

Review comment:
       where is this getting used? or is it just for logging purposes?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {

Review comment:
       Nit: Can you please add the doc for this method that was in the original code. I feel it is helpful

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }

Review comment:
       There were info logs for these in the original code. Any reason to remove them?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r430884635



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -120,35 +76,32 @@ public void init() {
     Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
     LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
 

Review comment:
       Good point. Nope, not used here. I'll remove this and add the log statement into the handler's init method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r435079989



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;

Review comment:
       yeah, that was the intent in which case i should move `lastProcessedOffsets` as well. unless you have any objections

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {

Review comment:
       that's fine. i think what probably happened was i did a "change signature" refactor with intellij that changed the formatting in the storage manager class. personally i prefer single line for readability, but don't have a strong opinion.
   
   i do think though that we should some sort of style pattern guidelines as part of open source docs - i don't see any there now.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();

Review comment:
       agree, i will do the same in `TaskSideInputStorageManager` as well since that validation also happens at the end of the constructor and doesn't need to.

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;

Review comment:
       fwiw i see a lot of usage of both styles, * static import and per method
   
   will change

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -279,13 +164,12 @@ void writeOffsetFiles() {
   }
 
   /**
-   * Gets the side input SSP offsets for all stores from their local offset files.
+   * Gets the side input SSP offsets for all stores from their local offset files. This method should be executed only
+   * once at class initialization.

Review comment:
       ah, i think i should actually remove this. there is no problem calling multiple times. at some point when writing this refactor i was unsure what the contract of this would be with `TaskSideInputHandler` when it came to initialization and wrote this comment. but all this does is read the offset files - caller can do as they please.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -329,18 +345,14 @@ public ContainerStorageManager(
   /**
    *  Creates SystemConsumer objects for store restoration, creating one consumer per system.
    */
-  private static Map<String, SystemConsumer> createConsumers(Map<String, Set<SystemStream>> systemStreams,
+  private static Map<String, SystemConsumer> createConsumers(Set<String> systems,

Review comment:
       sure, i'll keep storeSystems

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  /**
+   * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+   *
+   * @return the task name for this handler
+   */
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  /**
+   * Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP.
+   */
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets();
+    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
+
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+    LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+  }
+
+  /**
+   * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
+   * Synchronized inorder to be exclusive with flush().
+   *
+   * @param envelope incoming envelope to be processed
+   */
+  public synchronized void process(IncomingMessageEnvelope envelope) {

Review comment:
       i was only preserving the existing synchronization so don't know full context, but it appears to be necessary since `flush` and `process` are currently invoked from different threads concurrently. seems to imply that the rational was so that `writeOffsetFiles` would write the same offset for SSPs shared between stores. so i don't think `lastProcessedOffsets` being thread-safe is enough. in fact, i don't think it needs to be thread-safe at all since `init` and all `process` calls are exclusive from one another.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  /**
+   * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+   *
+   * @return the task name for this handler
+   */
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  /**
+   * Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP.
+   */
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets();
+    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
+
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+    LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+  }
+
+  /**
+   * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
+   * Synchronized inorder to be exclusive with flush().
+   *
+   * @param envelope incoming envelope to be processed
+   */
+  public synchronized void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }
+
+  /**
+   * Flushes the underlying {@link TaskSideInputStorageManager}
+   * Synchronized inorder to be exclusive with process()
+   */
+  public synchronized void flush() {
+    this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+  }
+
+  /**
+   * Gets the starting offset for the given side input {@link SystemStreamPartition}.
+   *
+   * Note: The method doesn't respect {@link org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
+   * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET} configurations. It will use the local offset
+   * file if it is valid, else it will fall back to oldest offset in the stream.
+   *
+   * @param ssp side input system stream partition to get the starting offset for
+   * @return the starting offset
+   */
+  public String getStartingOffset(SystemStreamPartition ssp) {
+    return this.startingOffsets.get(ssp);
+  }
+
+  /**
+   * Gets the last processed offset for the given side input {@link SystemStreamPartition}.
+   *
+   * @param ssp side input system stream partition to get the last processed offset for
+   * @return the last processed offset
+   */
+  public String getLastProcessedOffset(SystemStreamPartition ssp) {
+    return this.lastProcessedOffsets.get(ssp);
+  }
+
+  /**
+   * Stops the underlying storage manager at the last processed offsets.
+   */
+  public void stop() {

Review comment:
       i don't think `stop` requires synchronization? CSM will have ceased interaction with the instance before invoking `stop`, so there will be nothing else to synchronize between. though i should probably doc that precondition 

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+

Review comment:
       no strong feelings on this. generally i favor streams over imperative when it comes to transforms as i find them easier to conceptualize and less error-prone since they make any modification of the source explicit.
   
   fine reverting... but interested to hear any additional thoughts you have

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);

Review comment:
       ended up moving since CSM had otherwise no reason to keep track of the instance and i thought it better encapsulated lifecycle management (e.g. CSM can't mistakenly hold on to an instance that has been closed)
   
   i may actually end up moving this back out depending on particular impl details when i move side input processing onto `RunLoop`. i may end up sharing the storage manager instances across the handler class and a new class which implements `RunLoopTask`
   
   what are your thoughts for/against? if this is relatively inconsequential, i suggest we table this as it's likely to change again in a subsequent PR.
   
   

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
##########
@@ -258,9 +142,10 @@ private void initializeStoreDirectories() {
   /**
    * Writes the offset files for all side input stores one by one. There is one offset file per store.
    * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum.
+   *
+   * @param lastProcessedOffsets The offset per SSP to write
    */
-  @VisibleForTesting
-  void writeOffsetFiles() {
+  public void writeOffsetFiles(Map<SystemStreamPartition, String> lastProcessedOffsets) {

Review comment:
       sure, good suggestion. i'll change the name

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -714,15 +729,15 @@ private void startSideInputs() {
     LOG.info("SideInput Restore started");
 
     // initialize the sideInputStorageManagers
-    getSideInputStorageManagers().forEach(sideInputStorageManager -> sideInputStorageManager.init());
+    getSideInputHandlers().forEach(handler -> handler.init());

Review comment:
       agree - left this alone initially to try to minimize changes. will use reference instead




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r435596884



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+

Review comment:
       per offline conversation will revert this 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r437760212



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  /**
+   * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+   *
+   * @return the task name for this handler
+   */
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  /**
+   * Initializes the underlying {@link TaskSideInputStorageManager} and determines starting offsets for each SSP.
+   */
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = this.taskSideInputStorageManager.getFileOffsets();
+    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    LOG.info("Last processed offsets for the task {}: {}", taskName, lastProcessedOffsets);
+
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+    LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+  }
+
+  /**
+   * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
+   * Synchronized inorder to be exclusive with flush().
+   *
+   * @param envelope incoming envelope to be processed
+   */
+  public synchronized void process(IncomingMessageEnvelope envelope) {

Review comment:
       will revisit in subsequent PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r430880650



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {

Review comment:
       This is a very good catch. You are correct - I likely got ahead of myself as this synchronization will become unnecessary when side input processing moves into `RunLoop`. However, as it stands,`flush` and `process` within this class still need to be exclusive, and thus require synchronization modifiers.
   
   Thanks for noticing this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r430878347



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();

Review comment:
       This is not actually new - I just moved the bit validating the processor mapping out from the storage manager into the handler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat merged pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
mynameborat merged pull request #1367:
URL: https://github.com/apache/samza/pull/1367


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r430880059



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {

Review comment:
       Yes - in fact I will add better javadoc to each method in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] lakshmi-manasa-g commented on pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
lakshmi-manasa-g commented on pull request #1367:
URL: https://github.com/apache/samza/pull/1367#issuecomment-634445001


   Also great PR desc - helped me a hang of what to expect :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

Posted by GitBox <gi...@apache.org>.
bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r430878959



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }

Review comment:
       Ah good catch! No, Probably copy paste error by me. I'll put them back.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org