You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/22 07:06:33 UTC

[GitHub] [samza] bkonold commented on a change in pull request #1343: SAMZA-2353: Support standby containers with transactional state

bkonold commented on a change in pull request #1343:
URL: https://github.com/apache/samza/pull/1343#discussion_r429069274



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -625,7 +625,9 @@ public void run() {
               log.trace("Update offset for ssp {}, offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
 
               // update offset
-              task.offsetManager().update(task.taskName(), envelope.getSystemStreamPartition(), envelope.getOffset());
+              if (task.offsetManager() != null) {

Review comment:
       I think I'd rather keep it null. Leaving it as a default OffsetManager will be implementation dependent (if offset manager later changes, the default constructed object may no longer be appropriate).

##########
File path: samza-core/src/main/scala/org/apache/samza/container/RunLoopTask.scala
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container
+
+import org.apache.samza.checkpoint.OffsetManager
+import org.apache.samza.scheduler.EpochTimeScheduler
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.{ReadableCoordinator, TaskCallbackFactory}
+import org.apache.samza.util.Logging
+
+abstract class RunLoopTask extends Logging {

Review comment:
       Yes that will be fine.
   
   `TaskInstance extends Logging with RunLoopTask`
   
   should work.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -797,57 +838,69 @@ public void run() {
       }
 
     } catch (InterruptedException e) {
-      LOG.warn("Received an interrupt during side inputs store restoration."
-          + " Exiting prematurely without completing store restore.");
+      LOG.warn("Received an interrupt during side inputs store restoration. Exiting prematurely without completing store restore.");
       /*
        * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
        * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
        * resources prematurely here.
        */
-      shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
+      this.sideInputRunLoop.shutdown();
       throw new SamzaException("Side inputs read was interrupted", e);
     }
 
     LOG.info("SideInput Restore complete");
   }
 
-  private boolean sideInputsPresent() {
-    return !this.sideInputSystemStreams.isEmpty();
+  private void startSideInputCheckpointPollingThread() {
+    sideInputCheckpointRefreshFuture = sideInputCheckpointRefreshExecutor.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
+            TaskName activeTaskName = StandbyTaskUtil.getActiveTaskName(taskName);
+            Checkpoint checkpoint = checkpointManager.readLastCheckpoint(activeTaskName);
+            if (checkpoint != null) {
+              checkpoint.getOffsets().forEach((ssp, latestOffset) -> {
+                  if (taskSideInputStoreSSPs.get(taskName).values().stream().flatMap(Set::stream).anyMatch(ssp::equals)) {

Review comment:
       +1

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -885,28 +938,31 @@ public void stopStores() {
   public void shutdown() {
     // stop all nonsideinputstores including persistent and non-persistent stores
     this.containerModel.getTasks().forEach((taskName, taskModel) ->
-        getNonSideInputStores(taskName).forEach((storeName, store) -> store.stop())
+        getNonSideInputStorageEngines(taskName).forEach((storeName, store) -> store.stop())
     );
 
     this.shouldShutdown = true;
 
     // stop all sideinput consumers and stores
     if (sideInputsPresent()) {
-      sideInputsReadExecutor.shutdownNow();
+      sideInputRunLoop.shutdown();
+      sideInputRunLoopExecutor.shutdownNow();
+
+      this.taskSideInputHandlers.values().forEach(TaskSideInputHandler::stop);

Review comment:
       This is actually not the case today; shutdown of tasks is handled by SamzaContainer, not by RunLoop.
   
   We could perhaps change the lifecycle management to place this responsibility on RunLoop instead, but I have not investigated feasibility of that change but rather tried to mimic the current relationship between SamzaContainer, TaskInstance, and RunLoop.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -797,57 +838,69 @@ public void run() {
       }
 
     } catch (InterruptedException e) {
-      LOG.warn("Received an interrupt during side inputs store restoration."
-          + " Exiting prematurely without completing store restore.");
+      LOG.warn("Received an interrupt during side inputs store restoration. Exiting prematurely without completing store restore.");
       /*
        * We want to stop side input restoration and rethrow the exception upstream. Container should handle the
        * interrupt exception and shutdown the components and cleaning up the resource. We don't want to clean up the
        * resources prematurely here.
        */
-      shouldShutdown = true; // todo: should we cancel the flush future right away or wait for container to handle it as part of shutdown sequence?
+      this.sideInputRunLoop.shutdown();
       throw new SamzaException("Side inputs read was interrupted", e);
     }
 
     LOG.info("SideInput Restore complete");
   }
 
-  private boolean sideInputsPresent() {
-    return !this.sideInputSystemStreams.isEmpty();
+  private void startSideInputCheckpointPollingThread() {
+    sideInputCheckpointRefreshFuture = sideInputCheckpointRefreshExecutor.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) -> {
+            TaskName activeTaskName = StandbyTaskUtil.getActiveTaskName(taskName);
+            Checkpoint checkpoint = checkpointManager.readLastCheckpoint(activeTaskName);
+            if (checkpoint != null) {
+              checkpoint.getOffsets().forEach((ssp, latestOffset) -> {
+                  if (taskSideInputStoreSSPs.get(taskName).values().stream().flatMap(Set::stream).anyMatch(ssp::equals)) {
+                    Optional<String> currentOffsetOpt = sideInputSSPCheckpointOffsets.get(ssp);
+                    Optional<String> latestOffsetOpt = Optional.ofNullable(latestOffset);
+                    SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+
+                    // if current isn't present and latest is, or
+                    // current is present and latest > current
+                    if ((!currentOffsetOpt.isPresent() && latestOffsetOpt.isPresent())
+                        || (currentOffsetOpt.isPresent() && systemAdmin.offsetComparator(latestOffset, currentOffsetOpt.get()) > 0)) {
+                      synchronized (sideInputSSPLocks.get(ssp)) {
+                        sideInputSSPCheckpointOffsets.put(ssp, latestOffsetOpt);
+                        sideInputSSPLocks.get(ssp).notifyAll();
+                      }
+                    }
+                  }
+                });
+            }
+          });
+      }
+    }, 0, new TaskConfig(config).getCommitMs(), TimeUnit.MILLISECONDS);
   }
 
-  // Method to check if the given offset means the stream is caught up for reads
-  private void checkSideInputCaughtUp(SystemStreamPartition ssp, String offset, SystemStreamMetadata.OffsetType offsetType, boolean isEndOfStream) {
-
-    if (isEndOfStream) {
-      this.initialSideInputSSPMetadata.remove(ssp);
-      this.sideInputsCaughtUp.countDown();
-      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, offset, offsetType);
-      return;
-    }
+  private void initializeSideInputSSPMetadata() {
+    Set<SystemStreamPartition> allSideInputSSPs = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(map -> map.values().stream())
+        .flatMap(Set::stream)
+        .collect(Collectors.toSet());
 
-    SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata = this.initialSideInputSSPMetadata.get(ssp);
-    String offsetToCheck = sspMetadata == null ? null : sspMetadata.getOffset(offsetType);
-    LOG.trace("Checking {} offset {} against {} for {}.", offsetType, offset, offsetToCheck, ssp);
+    for (SystemStreamPartition ssp : allSideInputSSPs) {
+      SystemStreamMetadata systemStreamMetadata = streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
+      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
+          (systemStreamMetadata == null) ? null : systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
 
-    // Let's compare offset of the chosen message with offsetToCheck.
-    Integer comparatorResult;
-    if (offset == null || offsetToCheck == null) {
-      comparatorResult = -1;
-    } else {
-      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
-      comparatorResult = systemAdmin.offsetComparator(offset, offsetToCheck);
+      // record a copy of the sspMetadata, to later check if its caught up
+      initialSideInputSSPMetadata.put(ssp, sspMetadata);
     }
+  }
 
-    // The SSP is no longer lagging if the envelope's offset is greater than or equal to the
-    // latest offset.
-    if (comparatorResult != null && comparatorResult.intValue() >= 0) {
-
-      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, offset, offsetType);
-      // if its caught up, we remove the ssp from the map, and countDown the latch
-      this.initialSideInputSSPMetadata.remove(ssp);
-      this.sideInputsCaughtUp.countDown();
-      return;
-    }
+  private boolean sideInputsPresent() {
+    return !this.taskSideInputStoreSSPs.values().stream()
+        .allMatch(Map::isEmpty);

Review comment:
       Good idea. I'll change this.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,13 +114,13 @@
 public class ContainerStorageManager {

Review comment:
       https://issues.apache.org/jira/browse/SAMZA-2528

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/SideInputRestoreTask.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallbackFactory;
+import scala.collection.JavaConversions;
+
+
+class SideInputRestoreTask extends RunLoopTask {
+
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputRestoreTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskSideInputStorageManager taskSideInputStorageManager,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.taskSideInputStorageManager = taskSideInputStorageManager;
+    this.metrics = metrics;

Review comment:
       I can't seem to find any notes on our discussion, so I'll sync up with you offline and come back to summarize here.

##########
File path: samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
##########
@@ -1,296 +1,301 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage;
-
-import com.google.common.collect.ImmutableSet;
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Clock;
-import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-public class TestTaskSideInputStorageManager {
-  private static final String LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "logged-store";
-  private static final String NON_LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "non-logged-store";
-
-  @Test
-  public void testInit() {
-    final String storeName = "test-init-store";
-    final String taskName = "test-init-task";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ImmutableSet.of())
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
-  }
-
-  @Test
-  public void testFlush() {
-    final String storeName = "test-flush-store";
-    final String taskName = "test-flush-task";
-    final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
-    final String offset = "123";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ImmutableSet.of(ssp))
-        .build();
-    Map<String, StorageEngine> stores = new HashMap<>();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
-    testSideInputStorageManager.flush();
-
-    for (StorageEngine storageEngine : stores.values()) {
-      verify(storageEngine).flush();
-    }
-
-    verify(testSideInputStorageManager).writeOffsetFiles();
-
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
-
-    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
-    assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
-    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
-  }
-
-  @Test
-  public void testStop() {
-    final String storeName = "test-stop-store";
-    final String taskName = "test-stop-task";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
-        .addInMemoryStore(storeName, ImmutableSet.of())
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.stop();
-
-    verify(testSideInputStorageManager.getStore(storeName)).stop();
-    verify(testSideInputStorageManager).writeOffsetFiles();
-  }
-
-  @Test
-  public void testWriteOffsetFilesForNonPersistedStore() {
-    final String storeName = "test-write-offset-non-persisted-store";
-    final String taskName = "test-write-offset-for-non-persisted-task";
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
-        .addInMemoryStore(storeName, ImmutableSet.of())
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.writeOffsetFiles(); // should be no-op
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-
-    assertFalse("Store directory: " + storeDir.getPath() + " should not be created for non-persisted store", storeDir.exists());
-  }
-
-  @Test
-  public void testWriteOffsetFilesForPersistedStore() {
-    final String storeName = "test-write-offset-persisted-store";
-    final String storeName2 = "test-write-offset-persisted-store-2";
-
-    final String taskName = "test-write-offset-for-persisted-task";
-    final String offset = "123";
-    final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
-    final SystemStreamPartition ssp2 = new SystemStreamPartition("test-system2", "test-stream2", new Partition(0));
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ImmutableSet.of(ssp))
-        .addLoggedStore(storeName2, ImmutableSet.of(ssp2))
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp2, offset);
-    testSideInputStorageManager.writeOffsetFiles();
-    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
-
-    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
-
-    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
-
-    assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
-    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
-
-    assertTrue("Failed to get offset for ssp: " + ssp2.toString() + " from file.", fileOffsets.containsKey(ssp2));
-    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp2), offset);
-  }
-
-  @Test
-  public void testGetFileOffsets() {
-    final String storeName = "test-get-file-offsets-store";
-    final String taskName = "test-get-file-offsets-task";
-    final String offset = "123";
-
-    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
-        .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
-        .collect(Collectors.toSet());
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ssps)
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    ssps.forEach(ssp -> testSideInputStorageManager.updateLastProcessedOffset(ssp, offset));
-    testSideInputStorageManager.writeOffsetFiles();
-
-    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
-
-    ssps.forEach(ssp -> {
-        assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
-        assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
-      });
-  }
-
-  @Test
-  public void testGetStartingOffsets() {
-    final String storeName = "test-get-starting-offset-store";
-    final String taskName = "test-get-starting-offset-task";
-
-    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
-        .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
-        .collect(Collectors.toSet());
-
-
-    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ssps)
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
-        .collect(Collectors.toMap(Function.identity(), ssp -> {
-            int partitionId = ssp.getPartition().getPartitionId();
-            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
-            return String.valueOf(offset);
-          }));
-
-    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
-        .collect(Collectors.toMap(Function.identity(), ssp -> {
-            int partitionId = ssp.getPartition().getPartitionId();
-            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
-
-            return String.valueOf(offset);
-          }));
-
-    doCallRealMethod().when(testSideInputStorageManager).getStartingOffsets(fileOffsets, oldestOffsets);
-
-    Map<SystemStreamPartition, String> startingOffsets =
-        testSideInputStorageManager.getStartingOffsets(fileOffsets, oldestOffsets);
-
-    assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5);
-  }
-
-  private void initializeSideInputStorageManager(TaskSideInputStorageManager testSideInputStorageManager) {
-    doReturn(new HashMap<>()).when(testSideInputStorageManager).getStartingOffsets(any(), any());
-    testSideInputStorageManager.init();
-  }
-
-  private static final class MockTaskSideInputStorageManagerBuilder {
-    private final TaskName taskName;
-    private final String storeBaseDir;
-
-    private Clock clock = mock(Clock.class);
-    private Map<String, SideInputsProcessor> storeToProcessor = new HashMap<>();
-    private Map<String, StorageEngine> stores = new HashMap<>();
-    private Map<String, Set<SystemStreamPartition>> storeToSSps = new HashMap<>();
-    private StreamMetadataCache streamMetadataCache = mock(StreamMetadataCache.class);
-    private SystemAdmins systemAdmins = mock(SystemAdmins.class);
-
-    public MockTaskSideInputStorageManagerBuilder(String taskName, String storeBaseDir) {
-      this.taskName = new TaskName(taskName);
-      this.storeBaseDir = storeBaseDir;
-
-      initializeMocks();
-    }
-
-    private void initializeMocks() {
-      SystemAdmin admin = mock(SystemAdmin.class);
-      doAnswer(invocation -> {
-          String offset1 = invocation.getArgumentAt(0, String.class);
-          String offset2 = invocation.getArgumentAt(1, String.class);
-
-          return Long.compare(Long.parseLong(offset1), Long.parseLong(offset2));
-        }).when(admin).offsetComparator(any(), any());
-      doAnswer(invocation -> {
-          Map<SystemStreamPartition, String> sspToOffsets = invocation.getArgumentAt(0, Map.class);
-
-          return sspToOffsets.entrySet()
-              .stream()
-              .collect(Collectors.toMap(Map.Entry::getKey,
-                  entry -> String.valueOf(Long.parseLong(entry.getValue()) + 1)));
-        }).when(admin).getOffsetsAfter(any());
-      doReturn(admin).when(systemAdmins).getSystemAdmin("test-system");
-
-      doReturn(ScalaJavaUtil.toScalaMap(new HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
-    }
-
-    MockTaskSideInputStorageManagerBuilder addInMemoryStore(String storeName, Set<SystemStreamPartition> ssps) {
-      StorageEngine storageEngine = mock(StorageEngine.class);
-      when(storageEngine.getStoreProperties()).thenReturn(
-          new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(false).build());
-
-      stores.put(storeName, storageEngine);
-      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
-      storeToSSps.put(storeName, ssps);
-
-      return this;
-    }
-
-    MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, Set<SystemStreamPartition> ssps) {
-      StorageEngine storageEngine = mock(StorageEngine.class);
-      when(storageEngine.getStoreProperties()).thenReturn(
-          new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(true).build());
-
-      stores.put(storeName, storageEngine);
-      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
-      storeToSSps.put(storeName, ssps);
-
-      return this;
-    }
-
-    TaskSideInputStorageManager build() {
-      return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active, streamMetadataCache, new File(storeBaseDir), stores,
-          storeToProcessor, storeToSSps, systemAdmins, mock(Config.class), clock));
-    }
-  }
-}
\ No newline at end of file
+///*

Review comment:
       Commented this out for the sake of build passing while I got a thumbs up on the class restructuring. I'll rewrite these as tests for the new classes they belong to in the next PR update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org