You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/20 13:26:34 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1343: SAMZA-2353: Support standby containers with transactional state

mynameborat commented on a change in pull request #1343:
URL: https://github.com/apache/samza/pull/1343#discussion_r427966515



##########
File path: samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends NonTransactionalTaskSideInputStorageManager implements TaskSideInputStorageManager {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionalTaskSideInputStorageManager.class);
+
+  public TransactionalTaskSideInputStorageManager(
+      TaskName taskName,
+      TaskMode taskMode,
+      File storeBaseDir,
+      Map<String, StorageEngine> sideInputStores,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs,
+      Clock clock) {
+    super(taskName, taskMode, storeBaseDir, sideInputStores, storesToSSPs, clock);
+  }
+
+  @Override
+  public Map<String, Path> checkpoint(CheckpointId checkpointId) {
+    LOG.info("Creating checkpoint for task: {}", this.taskName);
+
+    Map<String, Path> checkpointPaths = new HashMap<>();
+    stores.forEach((store, storageEngine) ->
+      // TODO what subset of stores to checkpoint? an ssp can be a changelog side input for one store and a regular side input for another store
+      storageEngine.checkpoint(checkpointId).ifPresent(path -> checkpointPaths.put(store, path))
+    );
+    return checkpointPaths;
+  }
+
+  @Override
+  public void removeOldCheckpoints(String latestCheckpointId) {
+    LOG.info("Removing checkpoints older than: {} for task: {}", latestCheckpointId, this.taskName);
+    File[] storeDirs = storeBaseDir.listFiles((dir, name) -> stores.containsKey(name));
+    (storeDirs == null ? Stream.<File>empty() : Arrays.stream(storeDirs)).forEach(storeDir -> {

Review comment:
       yes. I was mostly referring to ternary since it was multi-line and I am fine with just splitting the declaration and keep the forEach as is. 

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/SideInputRestoreTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallbackFactory;
+import scala.collection.JavaConversions;
+
+
+class SideInputRestoreTask extends RunLoopTask {
+
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputRestoreTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskSideInputStorageManager taskSideInputStorageManager,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.taskSideInputStorageManager = taskSideInputStorageManager;
+    this.metrics = metrics;

Review comment:
       I forgot where we landed here. Can you summarize our conversation?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.executors.KeyBasedExecutorService;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+// TODO pick a better name for this class?
+
+/**
+ * This class encapsulates all processing logic / state for all side input SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+  // marks the offsets per SSP that must be bootstrapped to, inclusive. container startup will block until these offsets have been processed
+  private final Map<SystemStreamPartition, String> sspBootstrapOffsets;
+
+  // these objects are SHARED WITH CONTAINER STORAGE MANAGER
+  // used to coordinate updates of checkpoint offsets by ContainerStorageManager
+  private final Map<SystemStreamPartition, Object> sspLockObjects;
+  // indicates the latest checkpoint per SSP. updated by ContainerStorageManager background thread
+  private final Map<SystemStreamPartition, Optional<String>> checkpointedOffsets;
+  // used to block container until each SSP reaches its bootstrap offset
+  private final CountDownLatch sideInputTasksCaughtUp;
+
+  private final Map<SystemStreamPartition, String> startingOffsets;
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets;
+  private final KeyBasedExecutorService checkpointedSSPExecutor;
+  private final KeyBasedExecutorService nonCheckpointedSSPExecutor;
+
+  public TaskSideInputHandler(
+      TaskName taskName,
+      TaskSideInputStorageManager taskSideInputStorageManager,
+      Map<String, Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      CountDownLatch sideInputTasksCaughtUp,
+      Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSSPMetadata,
+      Map<SystemStreamPartition, Object> sspLockObjects,
+      Map<SystemStreamPartition, Optional<String>> checkpointOffsets) {
+    this.taskName = taskName;
+    this.taskSideInputStorageManager = taskSideInputStorageManager;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.sideInputTasksCaughtUp = sideInputTasksCaughtUp;
+    this.storeToProcessor = storeToProcessor;
+    this.sspLockObjects = Collections.unmodifiableMap(sspLockObjects);
+    this.checkpointedOffsets = Collections.unmodifiableMap(checkpointOffsets);
+
+    this.sspToStores = storeToSSPs.entrySet().stream()
+        .flatMap(storeAndSSPs -> storeAndSSPs.getValue().stream()
+            .map(ssp -> new AbstractMap.SimpleImmutableEntry<>(ssp, storeAndSSPs.getKey())))
+        .collect(Collectors.groupingBy(
+            Map.Entry::getKey,
+            Collectors.mapping(Map.Entry::getValue, Collectors.toSet())));
+
+    // for non-checkpointed SSPs, use their newest offset. for checkpointed SSPs, use their current checkpoint
+    this.sspBootstrapOffsets = new HashMap<>();
+    initialSSPMetadata.entrySet().stream()
+        // only SSPs for this task
+        .filter(entry -> this.sspToStores.containsKey(entry.getKey()))
+        // that do not have checkpoints
+        .filter(entry -> !this.checkpointedOffsets.containsKey(entry.getKey()))
+        .forEach(entry -> this.sspBootstrapOffsets.put(entry.getKey(), entry.getValue().getNewestOffset()));
+    this.checkpointedOffsets.entrySet().stream()
+        .filter(entry -> entry.getValue().isPresent())
+        .forEach(entry -> this.sspBootstrapOffsets.put(entry.getKey(), entry.getValue().get()));
+
+    this.lastProcessedOffsets = taskSideInputStorageManager.getFileOffsets();
+    this.startingOffsets = getStartingOffsets(this.lastProcessedOffsets, getOldestOffsets());
+    this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, true));
+
+
+    Set<String> checkpointedStores = this.sspToStores.entrySet().stream()
+        .filter(sspAndStores -> this.checkpointedOffsets.containsKey(sspAndStores.getKey()))
+        .flatMap(sspAndStores -> sspAndStores.getValue().stream())
+        .collect(Collectors.toSet());
+
+    Set<String> nonCheckpointedStores = this.sspToStores.entrySet().stream()
+        .filter(sspAndStores -> !this.checkpointedOffsets.containsKey(sspAndStores.getKey()))
+        .flatMap(sspAndStores -> sspAndStores.getValue().stream())
+        .collect(Collectors.toSet());
+
+    this.checkpointedSSPExecutor = new KeyBasedExecutorService(Math.max(1, checkpointedStores.size()));
+    this.nonCheckpointedSSPExecutor = new KeyBasedExecutorService(Math.max(1, nonCheckpointedStores.size()));
+  }
+
+  public void process(IncomingMessageEnvelope envelope, TaskCallbackFactory callbackFactory) {
+    TaskCallback callback = callbackFactory.createCallback();

Review comment:
       moving this after the block waiting on lock sounds good to me.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TransactionalTaskSideInputStorageManager.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TransactionalTaskSideInputStorageManager extends NonTransactionalTaskSideInputStorageManager implements TaskSideInputStorageManager {

Review comment:
       That is true for now although it can evolve to handle checkpoints in the a non-transactional way need be. Let me elaborate my first point a bit more.
   E.g. `writeCheckpoints(...)` invokes the  `writeCheckpoint(..)` of the `NonTransactionTaskSideInputStorageManager` before supplementing it with its own logic. Where we need to perform the responsibilities of this class in tandem with `NonTransactionTaskSideInputStorageManager` vs where we will need to override the responsibilities is tricky and challenging without knowing the ins and out of it. 
   
   Additionally, the helper methods such as validation of stores, inferring store properties etc could very well be common for implementors of `TaskSideInputStorageManager` but now forces them to either duplicate the code within their implementation or extend `NonTransactionTaskSideInputStorageManage` even if it may not be necessary.

##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -625,7 +625,9 @@ public void run() {
               log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update offset
-              task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());
+              if (task.offsetManager() != null) {

Review comment:
       I realized this is a getter. I am fine with leaving this as is or leave it up to you

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -885,28 +938,31 @@ public void stopStores() {
   public void shutdown() {
     // stop all nonsideinputstores including persistent and non-persistent stores
     this.containerModel.getTasks().forEach((taskName, taskModel) ->
-        getNonSideInputStores(taskName).forEach((storeName, store) -> store.stop())
+        getNonSideInputStorageEngines(taskName).forEach((storeName, store) -> store.stop())
     );
 
     this.shouldShutdown = true;
 
     // stop all sideinput consumers and stores
     if (sideInputsPresent()) {
-      sideInputsReadExecutor.shutdownNow();
+      sideInputRunLoop.shutdown();
+      sideInputRunLoopExecutor.shutdownNow();
+
+      this.taskSideInputHandlers.values().forEach(TaskSideInputHandler::stop);

Review comment:
       shouldn't run loop take care of handling shutdown for the tasks within which in turn can shutdown the task side input handlers? why do we need a handle of the side input handlers in CSM? 

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -239,66 +253,102 @@ public ContainerStorageManager(
     this.taskStores = createTaskStores(containerModel, jobContext, containerContext, storageEngineFactories, serdes, taskInstanceMetrics, taskInstanceCollectors);
 
     // create system consumers (1 per store system in changelogSystemStreams), and index it by storeName
-    Map<String, SystemConsumer> storeSystemConsumers = createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
-        e -> Collections.singleton(e.getValue()))), systemFactories, config, this.samzaContainerMetrics.registry());
+    Map<String, SystemConsumer> storeSystemConsumers = createConsumers(this.changelogSystemStreams.values().stream().map(SystemStream::getSystem).collect(
+        Collectors.toSet()),
+        systemFactories, config, this.samzaContainerMetrics.registry());
+
     this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers);
 
     // creating task restore managers
     this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, this.samzaContainerMetrics);
 
     // create sideInput storage managers
-    sideInputStorageManagers = createSideInputStorageManagers(clock);
+    this.taskSideInputStorageManagers = createSideInputStorageManagers(clock);
+
+    this.taskSideInputHandlers = new HashMap<>();
 
-    // create sideInput consumers indexed by systemName
-    this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, systemFactories, config, this.samzaContainerMetrics.registry());
+    Set<String> sideInputSystems = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(map -> map.values().stream())
+        .flatMap(Set::stream)
+        .map(SystemStreamPartition::getSystem)
+        .collect(Collectors.toSet());
+
+    this.sideInputSSPLocks = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(map -> map.values().stream())
+        .flatMap(Collection::stream)

Review comment:
       can we extract this to local variable and reuse this above for fetching `sideInputSystems` too? looks like its just doing the same.
   

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,13 +114,13 @@
 public class ContainerStorageManager {

Review comment:
       Not dealing with it in this PR is fine. Just wanted to call out to see if you had thought about this and seems like you have. It will be useful to capture it in a JIRA to track your ideas on improvements.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,13 +114,13 @@
 public class ContainerStorageManager {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
-  private static final String SIDEINPUTS_READ_THREAD_NAME = "SideInputs Read Thread";
-  private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush Thread";
+  private static final String SIDEINPUTS_RUNLOOP_THREAD_NAME = "SideInputs RunLoop Thread";
+  private static final String SIDEINPUTS_CHECKPOINT_THREAD_NAME = "SideInputs Checkpoint Refresh Thread";
   private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
   // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer
 
   private static final int SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS = 10; // Timeout with which sideinput read thread checks for exceptions
-  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = Duration.ofMinutes(1); // Period with which sideinputs are flushed
+  private static final Duration SIDE_INPUT_CHECKPOINT_SHUTDOWN_TIMEOUT = Duration.ofMinutes(1); // Period with which sideinputs are flushed

Review comment:
       sounds good.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -797,57 +838,69 @@ public void run() {
       }
 
     } catch (InterruptedException e) {
-      LOG.warn("Received an interrupt during side inputs store restoration."
-          + " Exiting prematurely without completing store restore.");
+      LOG.warn("Received an interrupt during side inputs store restoration. Exiting prematurely without completing store restore.");
       /*
        * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
        * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
        * resources prematurely here.
        */
-      shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
+      this.sideInputRunLoop.shutdown();
       throw new SamzaException("Side inputs read was interrupted", e);
     }
 
     LOG.info("SideInput Restore complete");
   }
 
-  private boolean sideInputsPresent() {
-    return !this.sideInputSystemStreams.isEmpty();
+  private void startSideInputCheckpointPollingThread() {
+    sideInputCheckpointRefreshFuture = sideInputCheckpointRefreshExecutor.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
+            TaskName activeTaskName = StandbyTaskUtil.getActiveTaskName(taskName);
+            Checkpoint checkpoint = checkpointManager.readLastCheckpoint(activeTaskName);
+            if (checkpoint != null) {
+              checkpoint.getOffsets().forEach((ssp, latestOffset) -> {
+                  if (taskSideInputStoreSSPs.get(taskName).values().stream().flatMap(Set::stream).anyMatch(ssp::equals)) {

Review comment:
       can we extract it to boolean variable for readability? 

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -797,57 +838,69 @@ public void run() {
       }
 
     } catch (InterruptedException e) {
-      LOG.warn("Received an interrupt during side inputs store restoration."
-          + " Exiting prematurely without completing store restore.");
+      LOG.warn("Received an interrupt during side inputs store restoration. Exiting prematurely without completing store restore.");
       /*
        * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
        * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
        * resources prematurely here.
        */
-      shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
+      this.sideInputRunLoop.shutdown();
       throw new SamzaException("Side inputs read was interrupted", e);
     }
 
     LOG.info("SideInput Restore complete");
   }
 
-  private boolean sideInputsPresent() {
-    return !this.sideInputSystemStreams.isEmpty();
+  private void startSideInputCheckpointPollingThread() {
+    sideInputCheckpointRefreshFuture = sideInputCheckpointRefreshExecutor.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
+            TaskName activeTaskName = StandbyTaskUtil.getActiveTaskName(taskName);
+            Checkpoint checkpoint = checkpointManager.readLastCheckpoint(activeTaskName);
+            if (checkpoint != null) {
+              checkpoint.getOffsets().forEach((ssp, latestOffset) -> {
+                  if (taskSideInputStoreSSPs.get(taskName).values().stream().flatMap(Set::stream).anyMatch(ssp::equals)) {
+                    Optional<String> currentOffsetOpt = sideInputSSPCheckpointOffsets.get(ssp);
+                    Optional<String> latestOffsetOpt = Optional.ofNullable(latestOffset);
+                    SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+
+                    // if current isn't present and latest is, or
+                    // current is present and latest > current

Review comment:
       can we update these comments to associate with the scenario instead of literally what the code below is about? If not, we can remove them IMO

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -797,57 +838,69 @@ public void run() {
       }
 
     } catch (InterruptedException e) {
-      LOG.warn("Received an interrupt during side inputs store restoration."
-          + " Exiting prematurely without completing store restore.");
+      LOG.warn("Received an interrupt during side inputs store restoration. Exiting prematurely without completing store restore.");
       /*
        * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
        * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
        * resources prematurely here.
        */
-      shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
+      this.sideInputRunLoop.shutdown();
       throw new SamzaException("Side inputs read was interrupted", e);
     }
 
     LOG.info("SideInput Restore complete");
   }
 
-  private boolean sideInputsPresent() {
-    return !this.sideInputSystemStreams.isEmpty();
+  private void startSideInputCheckpointPollingThread() {
+    sideInputCheckpointRefreshFuture = sideInputCheckpointRefreshExecutor.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
+            TaskName activeTaskName = StandbyTaskUtil.getActiveTaskName(taskName);
+            Checkpoint checkpoint = checkpointManager.readLastCheckpoint(activeTaskName);
+            if (checkpoint != null) {
+              checkpoint.getOffsets().forEach((ssp, latestOffset) -> {
+                  if (taskSideInputStoreSSPs.get(taskName).values().stream().flatMap(Set::stream).anyMatch(ssp::equals)) {
+                    Optional<String> currentOffsetOpt = sideInputSSPCheckpointOffsets.get(ssp);
+                    Optional<String> latestOffsetOpt = Optional.ofNullable(latestOffset);
+                    SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+
+                    // if current isn't present and latest is, or
+                    // current is present and latest > current
+                    if ((!currentOffsetOpt.isPresent() && latestOffsetOpt.isPresent())
+                        || (currentOffsetOpt.isPresent() && systemAdmin.offsetComparator(latestOffset, currentOffsetOpt.get()) > 0)) {
+                      synchronized (sideInputSSPLocks.get(ssp)) {
+                        sideInputSSPCheckpointOffsets.put(ssp, latestOffsetOpt);
+                        sideInputSSPLocks.get(ssp).notifyAll();
+                      }
+                    }
+                  }
+                });
+            }
+          });
+      }
+    }, 0, new TaskConfig(config).getCommitMs(), TimeUnit.MILLISECONDS);
   }
 
-  // Method to check if the given offset means the stream is caught up for reads
-  private void checkSideInputCaughtUp(SystemStreamPartition ssp, String offset, SystemStreamMetadata.OffsetType offsetType, boolean isEndOfStream) {
-
-    if (isEndOfStream) {
-      this.initialSideInputSSPMetadata.remove(ssp);
-      this.sideInputsCaughtUp.countDown();
-      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, offset, offsetType);
-      return;
-    }
+  private void initializeSideInputSSPMetadata() {
+    Set<SystemStreamPartition> allSideInputSSPs = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(map -> map.values().stream())
+        .flatMap(Set::stream)
+        .collect(Collectors.toSet());
 
-    SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata = this.initialSideInputSSPMetadata.get(ssp);
-    String offsetToCheck = sspMetadata == null ? null : sspMetadata.getOffset(offsetType);
-    LOG.trace("Checking {} offset {} against {} for {}.", offsetType, offset, offsetToCheck, ssp);
+    for (SystemStreamPartition ssp : allSideInputSSPs) {
+      SystemStreamMetadata systemStreamMetadata = streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
+      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
+          (systemStreamMetadata == null) ? null : systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
 
-    // Let's compare offset of the chosen message with offsetToCheck.
-    Integer comparatorResult;
-    if (offset == null || offsetToCheck == null) {
-      comparatorResult = -1;
-    } else {
-      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
-      comparatorResult = systemAdmin.offsetComparator(offset, offsetToCheck);
+      // record a copy of the sspMetadata, to later check if its caught up
+      initialSideInputSSPMetadata.put(ssp, sspMetadata);
     }
+  }
 
-    // The SSP is no longer lagging if the envelope's offset is greater than or equal to the
-    // latest offset.
-    if (comparatorResult != null && comparatorResult.intValue() >= 0) {
-
-      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, offset, offsetType);
-      // if its caught up, we remove the ssp from the map, and countDown the latch
-      this.initialSideInputSSPMetadata.remove(ssp);
-      this.sideInputsCaughtUp.countDown();
-      return;
-    }
+  private boolean sideInputsPresent() {
+    return !this.taskSideInputStoreSSPs.values().stream()
+        .allMatch(Map::isEmpty);

Review comment:
       can this be made an instance variable and be initialized within constructor? we have a lot of signals within constructor to say if it has side inputs or not. 

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -149,22 +153,26 @@
   private final int maxChangeLogStreamPartitions; // The partition count of each changelog-stream topic. This is used for validating changelog streams before restoring.
 
   /* Sideinput related parameters */
-  private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map of sideInput system-streams indexed by store name
-  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs;
-  private final Map<SystemStreamPartition, TaskSideInputStorageManager> sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp, for simpler lookup for process()
-  private final Map<String, SystemConsumer> sideInputConsumers; // Mapping from storeSystemNames to SystemConsumers
+  // side inputs indexed first by task, then store name
+  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs;
+  private final Map<SystemStreamPartition, Object> sideInputSSPLocks;
+  private final Map<TaskName, TaskSideInputStorageManager> taskSideInputStorageManagers;
   private SystemConsumers sideInputSystemConsumers;
-  private final Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
-      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the taskSideInputSSPs recorded at start, used to determine when sideInputs are caughtup and container init can proceed
-  private volatile CountDownLatch sideInputsCaughtUp; // Used by the sideInput-read thread to signal to the main thread
+  // Recorded sspMetadata of the taskSideInputSSPs recorded at start, used to determine when sideInputs are caughtup and container init can proceed
+  private final Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata = new ConcurrentHashMap<>();
+  // Used by the sideInput-read thread to signal to the main thread
+  private volatile CountDownLatch sideInputTasksCaughtUp;
   private volatile boolean shouldShutdown = false;
+  private final ConcurrentHashMap<SystemStreamPartition, Optional<String>> sideInputSSPCheckpointOffsets;

Review comment:
       why do we have an Optional of string as value? Outside initialization, we always update this map only with valid offset string below. If the Optional is only used to check if map is not initialized for the SSP, can we use containsKey instead?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -714,81 +744,92 @@ private void startSideInputs() {
     LOG.info("SideInput Restore started");
 
     // initialize the sideInputStorageManagers
-    getSideInputStorageManagers().forEach(sideInputStorageManager -> sideInputStorageManager.init());
+    this.taskSideInputStorageManagers.values().forEach(TaskSideInputStorageManager::init);
 
-    // start the checkpointing thread at the commit-ms frequency
-    TaskConfig taskConfig = new TaskConfig(config);
-    sideInputsFlushFuture = sideInputsFlushExecutor.scheduleWithFixedDelay(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          getSideInputStorageManagers().forEach(sideInputStorageManager -> sideInputStorageManager.flush());
-        } catch (Exception e) {
-          LOG.error("Exception during flushing sideInputs", e);
-          sideInputException = e;
-        }
-      }
-    }, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
+    // fetch initial metadata for each SSP
+    initializeSideInputSSPMetadata();
 
-    // set the latch to the number of sideInput SSPs
-    this.sideInputsCaughtUp = new CountDownLatch(this.sideInputStorageManagers.keySet().size());
+    // start the thread to poll checkpoint topic
+    startSideInputCheckpointPollingThread();
 
-    // register all sideInput SSPs with the consumers
-    for (SystemStreamPartition ssp : sideInputStorageManagers.keySet()) {
-      String startingOffset = sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
+    // set the latch to the number of side input tasks
+    this.sideInputTasksCaughtUp = new CountDownLatch(this.taskSideInputStoreSSPs.keySet().size());
 
-      if (startingOffset == null) {
-        throw new SamzaException("No offset defined for SideInput SystemStreamPartition : " + ssp);
-      }
+    // creating sideInput store processors, one per store per task
+    Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputStoreProcessors =
+        createSideInputProcessors(new StorageConfig(config), this.containerModel, this.taskInstanceMetrics);
 
-      // register startingOffset with the sysConsumer and register a metric for it
-      sideInputSystemConsumers.register(ssp, startingOffset);
-      taskInstanceMetrics.get(sideInputStorageManagers.get(ssp).getTaskName()).addOffsetGauge(
-          ssp, ScalaJavaUtil.toScalaFunction(() -> sideInputStorageManagers.get(ssp).getLastProcessedOffset(ssp)));
+    Map<TaskName, RunLoopTask> taskMap = new HashMap<>();
+    this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        if (storesToSSPs.isEmpty()) {
+          return;
+        }
+        Set<SystemStreamPartition> taskSSPs = taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
+
+        Map<String, SideInputsProcessor> sideInputsProcessors = taskSideInputStoreProcessors.get(taskName);
+
+        TaskSideInputHandler taskSideInputHandler = new TaskSideInputHandler(
+            taskName,
+            this.taskSideInputStorageManagers.get(taskName),
+            this.taskSideInputStoreSSPs.get(taskName),
+            sideInputsProcessors,
+            this.systemAdmins,
+            this.streamMetadataCache,
+            this.sideInputTasksCaughtUp,
+            this.initialSideInputSSPMetadata,
+            this.sideInputSSPLocks,
+            this.sideInputSSPCheckpointOffsets);
+        this.taskSideInputHandlers.put(taskName, taskSideInputHandler);
+
+        taskSSPs.forEach(ssp -> {
+            String startingOffset = taskSideInputHandler.getStartingOffset(ssp);
+
+            if (startingOffset == null) {
+              throw new SamzaException("No offset defined for SideInput SystemStreamPartition : " + ssp);
+            }
 
-      SystemStreamMetadata systemStreamMetadata = streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
-      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
-          (systemStreamMetadata == null) ? null : systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+            // register startingOffset with the sysConsumer and register a metric for it
+            sideInputSystemConsumers.register(ssp, startingOffset);
+            taskInstanceMetrics.get(taskName).addOffsetGauge(
+                ssp, ScalaJavaUtil.toScalaFunction(() -> taskSideInputHandler.getLastProcessedOffset(ssp)));
+          });
 
-      // record a copy of the sspMetadata, to later check if its caught up
-      initialSideInputSSPMetadata.put(ssp, sspMetadata);
+        taskMap.put(taskName, new SideInputRestoreTask(taskName, taskSSPs, taskSideInputHandler, taskSideInputStorageManagers.get(taskName), this.taskInstanceMetrics.get(taskName)));
+      });
 
-      // check if the ssp is caught to upcoming, even at start
-      checkSideInputCaughtUp(ssp, startingOffset, SystemStreamMetadata.OffsetType.UPCOMING, false);
-    }
+    TaskConfig taskConfig = new TaskConfig(config);
+    sideInputRunLoop = new RunLoop(taskMap,
+        null,
+        sideInputSystemConsumers,
+        1, // restrict task concurrency to 1 for now

Review comment:
       minor: suggest adding comments to other parameters in the construction that take raw values just like this.

##########
File path: samza-core/src/main/scala/org/apache/samza/container/RunLoopTask.scala
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container
+
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.scheduler.EpochTimeScheduler
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.{ReadableCoordinator, TaskCallbackFactory}
+import org.apache.samza.util.Logging
+
+abstract class RunLoopTask extends Logging {

Review comment:
       agreed. would it be possible to keep this just `RunLoopTask` and have `TaskInstance` extend both `Logging` and `RunLoopTask`. 
   
   I am no scala expert either. @prateekm do you have any suggestion here on how to proceed?

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
##########
@@ -1,296 +1,301 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage;
-
-import com.google.common.collect.ImmutableSet;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Clock;
-import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-public class TestTaskSideInputStorageManager {
-  private static final String LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "logged-store";
-  private static final String NON_LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "non-logged-store";
-
-  @Test
-  public void testInit() {
-    final String storeName = "test-init-store";
-    final String taskName = "test-init-task";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ImmutableSet.of())
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
-  }
-
-  @Test
-  public void testFlush() {
-    final String storeName = "test-flush-store";
-    final String taskName = "test-flush-task";
-    final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
-    final String offset = "123";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ImmutableSet.of(ssp))
-        .build();
-    Map<String, StorageEngine> stores = new HashMap<>();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
-    testSideInputStorageManager.flush();
-
-    for (StorageEngine storageEngine : stores.values()) {
-      verify(storageEngine).flush();
-    }
-
-    verify(testSideInputStorageManager).writeOffsetFiles();
-
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
-
-    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
-    assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
-    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
-  }
-
-  @Test
-  public void testStop() {
-    final String storeName = "test-stop-store";
-    final String taskName = "test-stop-task";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
-        .addInMemoryStore(storeName, ImmutableSet.of())
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.stop();
-
-    verify(testSideInputStorageManager.getStore(storeName)).stop();
-    verify(testSideInputStorageManager).writeOffsetFiles();
-  }
-
-  @Test
-  public void testWriteOffsetFilesForNonPersistedStore() {
-    final String storeName = "test-write-offset-non-persisted-store";
-    final String taskName = "test-write-offset-for-non-persisted-task";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
-        .addInMemoryStore(storeName, ImmutableSet.of())
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.writeOffsetFiles(); // should be no-op
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-
-    assertFalse("Store directory: " + storeDir.getPath() + " should not be created for non-persisted store", storeDir.exists());
-  }
-
-  @Test
-  public void testWriteOffsetFilesForPersistedStore() {
-    final String storeName = "test-write-offset-persisted-store";
-    final String storeName2 = "test-write-offset-persisted-store-2";
-
-    final String taskName = "test-write-offset-for-persisted-task";
-    final String offset = "123";
-    final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
-    final SystemStreamPartition ssp2 = new SystemStreamPartition("test-system2", "test-stream2", new Partition(0));
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ImmutableSet.of(ssp))
-        .addLoggedStore(storeName2, ImmutableSet.of(ssp2))
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp2, offset);
-    testSideInputStorageManager.writeOffsetFiles();
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-
-    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
-
-    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
-
-    assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
-    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
-
-    assertTrue("Failed to get offset for ssp: " + ssp2.toString() + " from file.", fileOffsets.containsKey(ssp2));
-    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp2), offset);
-  }
-
-  @Test
-  public void testGetFileOffsets() {
-    final String storeName = "test-get-file-offsets-store";
-    final String taskName = "test-get-file-offsets-task";
-    final String offset = "123";
-
-    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
-        .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
-        .collect(Collectors.toSet());
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ssps)
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    ssps.forEach(ssp -> testSideInputStorageManager.updateLastProcessedOffset(ssp, offset));
-    testSideInputStorageManager.writeOffsetFiles();
-
-    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
-
-    ssps.forEach(ssp -> {
-        assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
-        assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
-      });
-  }
-
-  @Test
-  public void testGetStartingOffsets() {
-    final String storeName = "test-get-starting-offset-store";
-    final String taskName = "test-get-starting-offset-task";
-
-    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
-        .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
-        .collect(Collectors.toSet());
-
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ssps)
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
-        .collect(Collectors.toMap(Function.identity(), ssp -> {
-            int partitionId = ssp.getPartition().getPartitionId();
-            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
-            return String.valueOf(offset);
-          }));
-
-    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
-        .collect(Collectors.toMap(Function.identity(), ssp -> {
-            int partitionId = ssp.getPartition().getPartitionId();
-            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
-
-            return String.valueOf(offset);
-          }));
-
-    doCallRealMethod().when(testSideInputStorageManager).getStartingOffsets(fileOffsets, oldestOffsets);
-
-    Map<SystemStreamPartition, String> startingOffsets =
-        testSideInputStorageManager.getStartingOffsets(fileOffsets, oldestOffsets);
-
-    assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5);
-  }
-
-  private void initializeSideInputStorageManager(TaskSideInputStorageManager testSideInputStorageManager) {
-    doReturn(new HashMap<>()).when(testSideInputStorageManager).getStartingOffsets(any(), any());
-    testSideInputStorageManager.init();
-  }
-
-  private static final class MockTaskSideInputStorageManagerBuilder {
-    private final TaskName taskName;
-    private final String storeBaseDir;
-
-    private Clock clock = mock(Clock.class);
-    private Map<String, SideInputsProcessor> storeToProcessor = new HashMap<>();
-    private Map<String, StorageEngine> stores = new HashMap<>();
-    private Map<String, Set<SystemStreamPartition>> storeToSSps = new HashMap<>();
-    private StreamMetadataCache streamMetadataCache = mock(StreamMetadataCache.class);
-    private SystemAdmins systemAdmins = mock(SystemAdmins.class);
-
-    public MockTaskSideInputStorageManagerBuilder(String taskName, String storeBaseDir) {
-      this.taskName = new TaskName(taskName);
-      this.storeBaseDir = storeBaseDir;
-
-      initializeMocks();
-    }
-
-    private void initializeMocks() {
-      SystemAdmin admin = mock(SystemAdmin.class);
-      doAnswer(invocation -> {
-          String offset1 = invocation.getArgumentAt(0, String.class);
-          String offset2 = invocation.getArgumentAt(1, String.class);
-
-          return Long.compare(Long.parseLong(offset1), Long.parseLong(offset2));
-        }).when(admin).offsetComparator(any(), any());
-      doAnswer(invocation -> {
-          Map<SystemStreamPartition, String> sspToOffsets = invocation.getArgumentAt(0, Map.class);
-
-          return sspToOffsets.entrySet()
-              .stream()
-              .collect(Collectors.toMap(Map.Entry::getKey,
-                  entry -> String.valueOf(Long.parseLong(entry.getValue()) + 1)));
-        }).when(admin).getOffsetsAfter(any());
-      doReturn(admin).when(systemAdmins).getSystemAdmin("test-system");
-
-      doReturn(ScalaJavaUtil.toScalaMap(new HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
-    }
-
-    MockTaskSideInputStorageManagerBuilder addInMemoryStore(String storeName, Set<SystemStreamPartition> ssps) {
-      StorageEngine storageEngine = mock(StorageEngine.class);
-      when(storageEngine.getStoreProperties()).thenReturn(
-          new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(false).build());
-
-      stores.put(storeName, storageEngine);
-      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
-      storeToSSps.put(storeName, ssps);
-
-      return this;
-    }
-
-    MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, Set<SystemStreamPartition> ssps) {
-      StorageEngine storageEngine = mock(StorageEngine.class);
-      when(storageEngine.getStoreProperties()).thenReturn(
-          new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(true).build());
-
-      stores.put(storeName, storageEngine);
-      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
-      storeToSSps.put(storeName, ssps);
-
-      return this;
-    }
-
-    TaskSideInputStorageManager build() {
-      return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active, streamMetadataCache, new File(storeBaseDir), stores,
-          storeToProcessor, storeToSSps, systemAdmins, mock(Config.class), clock));
-    }
-  }
-}
\ No newline at end of file
+///*

Review comment:
       Why are we commenting this out? Are you planning to fix this in the future iterations?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org