You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/23 08:09:22 UTC

[GitHub] [samza] bkonold commented on a change in pull request #1367: SAMZA-2530: Split out processing logic from TaskSideInputStorageManager

bkonold commented on a change in pull request #1367:
URL: https://github.com/apache/samza/pull/1367#discussion_r429525357



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }

Review comment:
       copied from `TaskSideInputStorageManager`

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskSideInputHandler {
+  private static final String TEST_TASK_NAME = "test-task";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STORE = "test-store";
+  private static final String TEST_STREAM = "test-stream";
+
+    /**
+   * This test is for cases, when calls to systemAdmin (e.g., KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, "1", "2")));
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStreamMetadata(Collections.singletonMap(new SystemStream(TEST_SYSTEM, TEST_STREAM),
+            new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+        .addStore(TEST_STORE, ssps)
+        .build();
+
+    handler.init();
+
+    ssps.forEach(ssp -> {
+        String startingOffset = handler.getStartingOffset(
+            new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStore(storeName, ssps)
+        .build();
+
+    // set up file and oldest offsets. for even partitions, fileOffsets will be larger; for odd partitions oldestOffsets will be larger
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(handler).getStartingOffsets(fileOffsets, oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets = handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5);
+    startingOffsets.forEach((ssp, offset) -> {
+        int partitionId = ssp.getPartition().getPartitionId();
+        String expectedOffset = partitionId % 2 == 0
+            // 1 + fileOffset
+            ? getOffsetAfter(String.valueOf(ssp.getPartition().getPartitionId() + 10))
+            // oldestOffset
+            : String.valueOf(ssp.getPartition().getPartitionId() + 10);
+        assertEquals("Larger of fileOffsets and oldestOffsets should always be chosen", expectedOffset, offset);
+      });

Review comment:
       New - added for stricter behavior check on `getStartingOffsets`

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Clock clock) {
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    this.taskSideInputStorageManager = new TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+
+    validateProcessorConfiguration();
+  }
+
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+  }
+
+  public void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }
+
+  public void flush() {
+    this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+  }
+
+  public String getStartingOffset(SystemStreamPartition ssp) {
+    return this.startingOffsets.get(ssp);
+  }
+
+  public String getLastProcessedOffset(SystemStreamPartition ssp) {
+    return this.lastProcessedOffsets.get(ssp);
+  }
+
+  public void stop() {
+    this.taskSideInputStorageManager.stop(this.lastProcessedOffsets);
+  }
+
+  /**
+   * Gets the starting offsets for the {@link SystemStreamPartition}s belonging to all the side input stores.
+   * If the local file offset is available and is greater than the oldest available offset from source, uses it,
+   * else falls back to oldest offset in the source.
+   *
+   * @param fileOffsets offsets from the local offset file
+   * @param oldestOffsets oldest offsets from the source
+   * @return a {@link Map} of {@link SystemStreamPartition} to offset
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getStartingOffsets(
+      Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) {
+    Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
+
+    this.sspToStores.keySet().forEach(ssp -> {
+        String fileOffset = fileOffsets.get(ssp);
+        String oldestOffset = oldestOffsets.get(ssp);
+
+        startingOffsets.put(ssp,
+            this.storageManagerUtil.getStartingOffset(
+                ssp, this.systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, oldestOffset));
+      });
+
+    return startingOffsets;
+  }
+
+  /**
+   * Gets the oldest offset for the {@link SystemStreamPartition}s associated with all the store side inputs.
+   *   1. Groups the list of the SSPs based on system stream
+   *   2. Fetches the {@link SystemStreamMetadata} from {@link StreamMetadataCache}
+   *   3. Fetches the partition metadata for each system stream and fetch the corresponding partition metadata
+   *      and populates the oldest offset for SSPs belonging to the system stream.
+   *
+   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset. If partitionMetadata could not be
+   * obtained for any {@link SystemStreamPartition} the offset for it is populated as null.
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getOldestOffsets() {
+    Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+
+    // Step 1
+    Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = this.sspToStores.keySet().stream()
+        .collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
+
+    // Step 2
+    Map<SystemStream, SystemStreamMetadata> metadata = JavaConverters.mapAsJavaMapConverter(
+        streamMetadataCache.getStreamMetadata(
+            JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(), false)).asJava();
+
+    // Step 3
+    metadata.forEach((systemStream, systemStreamMetadata) -> {
+
+        // get the partition metadata for each system stream
+        Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
+            systemStreamMetadata.getSystemStreamPartitionMetadata();
+
+        // For SSPs belonging to the system stream, use the partition metadata to get the oldest offset
+        // if partitionMetadata was not obtained for any SSP, populate oldest-offset as null
+        // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using lambda will NPE when getOldestOffset() is null
+        for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+          oldestOffsets.put(ssp, partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+        }
+      });
+
+    return oldestOffsets;
+  }
+
+  private void validateProcessorConfiguration() {
+    Set<String> stores = this.sspToStores.values().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toSet());
+
+    stores.forEach(storeName -> {
+        if (!storeToProcessor.containsKey(storeName)) {
+          throw new SamzaException(
+              String.format("Side inputs processor missing for store: %s.", storeName));
+        }
+      });
+  }
+}

Review comment:
       copied from `TaskSideInputStorageManager`

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskSideInputHandler {
+  private static final String TEST_TASK_NAME = "test-task";
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STORE = "test-store";
+  private static final String TEST_STREAM = "test-stream";
+
+    /**
+   * This test is for cases, when calls to systemAdmin (e.g., KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, "1", "2")));
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStreamMetadata(Collections.singletonMap(new SystemStream(TEST_SYSTEM, TEST_STREAM),
+            new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+        .addStore(TEST_STORE, ssps)
+        .build();
+
+    handler.init();
+
+    ssps.forEach(ssp -> {
+        String startingOffset = handler.getStartingOffset(
+            new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputHandler handler = new MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStore(storeName, ssps)
+        .build();
+
+    // set up file and oldest offsets. for even partitions, fileOffsets will be larger; for odd partitions oldestOffsets will be larger
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(handler).getStartingOffsets(fileOffsets, oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets = handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5);

Review comment:
       adapted from `TaskSideInputStorageManager`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org