You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/07/01 16:16:11 UTC

[GitHub] [samza] mynameborat commented on a change in pull request #1385: SAMZA-2550: Move side input processing to use RunLoop

mynameborat commented on a change in pull request #1385:
URL: https://github.com/apache/samza/pull/1385#discussion_r443689543



##########
File path: samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public TaskName taskName() {
+    return this.taskName;
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, ReadableCoordinator coordinator,
+      TaskCallbackFactory callbackFactory) {
+    TaskCallback callback = callbackFactory.createCallback();
+    this.metrics.processes().inc();
+    try {
+      this.taskSideInputHandler.process(envelope);
+      this.metrics.messagesActuallyProcessed().inc();
+      callback.complete();
+    } catch (Exception e) {
+      callback.failure(e);
+    }
+  }
+
+  @Override
+  public void window(ReadableCoordinator coordinator) {
+
+  }
+
+  @Override
+  public void scheduler(ReadableCoordinator coordinator) {
+
+  }

Review comment:
       I understand these don't get called for side input tasks. Do we still want to throw exceptions or log warnings here as a safe guard? 
   Maybe consider this for other methods that aren't implemented as part of the interface. 

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -63,19 +65,23 @@
   private final Map<String, SideInputsProcessor> storeToProcessor;
   private final SystemAdmins systemAdmins;
   private final StreamMetadataCache streamMetadataCache;
+  // indicates to container that all side input ssps in this task are caught up

Review comment:
       nit: indicates to _CSM_ would be simpler yet accurate. It requires the entire e2e picture and how CSM interacts with container to understand this comment.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -110,14 +108,12 @@
 public class ContainerStorageManager {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
-  private static final String SIDEINPUTS_READ_THREAD_NAME = "SideInputs Read Thread";
-  private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush Thread";
+  private static final String SIDEINPUTS_THREAD_NAME = "SideInputs Thread";
   private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
   // We use a prefix to differentiate the SystemConsumersMetrics for sideInputs from the ones in SamzaContainer
 
-  private static final int SIDE_INPUT_READ_THREAD_TIMEOUT_SECONDS = 10; // Timeout with which sideinput read thread checks for exceptions
-  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = Duration.ofMinutes(1); // Period with which sideinputs are flushed
-
+  private static final int SIDE_INPUT_LATCH_TIMEOUT_SECONDS = 10; // Timeout with which sideinput thread checks for exceptions

Review comment:
       can we update the comment here? this latch is used to check for caught up status currently. The side input exceptions get populated by the runloop flow.
   
   Also, I'd suggest to rename this to something like `SIDE_INPUT_CAUGHT_CHECK_TIMEOUT` instead of of latch as one needs to figure out what this latch is used for. Maybe clarify in the comment that the check timeout applies across all the side inputs.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {
+  private final TaskName taskName;
+  private final Set<SystemStreamPartition> taskSSPs;
+  private final TaskSideInputHandler taskSideInputHandler;
+  private final TaskInstanceMetrics metrics;
+
+  public SideInputTask(
+      TaskName taskName,
+      Set<SystemStreamPartition> taskSSPs,
+      TaskSideInputHandler taskSideInputHandler,
+      TaskInstanceMetrics metrics) {
+    this.taskName = taskName;
+    this.taskSSPs = taskSSPs;
+    this.taskSideInputHandler = taskSideInputHandler;
+    this.metrics = metrics;
+  }
+
+  @Override
+  public TaskName taskName() {
+    return this.taskName;
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, ReadableCoordinator coordinator,
+      TaskCallbackFactory callbackFactory) {
+    TaskCallback callback = callbackFactory.createCallback();
+    this.metrics.processes().inc();
+    try {
+      this.taskSideInputHandler.process(envelope);
+      this.metrics.messagesActuallyProcessed().inc();

Review comment:
       > New side input processing metrics emitted by SamzaContainerMetrics and TaskInstanceMetrics are given their own namespace in order to differentiate from the primary container / run loop.
   
   Does this mean post this change, the old metrics emitted (if any) wouldn't work. If we didn't emit metrics for these prior to this change well and good. If not, would suggest updating the API changes description about metrics

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -259,6 +288,44 @@ public void stop() {
     return oldestOffsets;
   }
 
+  /**
+   * An SSP is considered caught up once the offset indicated for it in {@link #sspOffsetsToBlockUntil} has been
+   * processed. Once the set of SSPs to catch up becomes empty, the latch for the task will count down, notifying
+   * {@link ContainerStorageManager} that it is caught up.
+   *
+   * @param ssp The SSP to be checked
+   * @param currentOffset The offset to be checked
+   * @param isStartingOffset Indicates whether the offset being checked is the starting offset of the SSP (and thus has
+   *                         not yet been processed). This will be set to true when each SSP's starting offset is checked
+   *                         on init, and false when checking if an ssp is caught up after processing an envelope.
+   */
+  private void checkCaughtUp(SystemStreamPartition ssp, String currentOffset, boolean isStartingOffset) {
+    String offsetToBlockUntil = this.sspOffsetsToBlockUntil.get(ssp);
+
+    LOG.trace("Checking offset {} against {} for {}. isStartingOffset: {}", currentOffset, offsetToBlockUntil, ssp, isStartingOffset);
+
+    Integer comparatorResult;
+    if (currentOffset == null || offsetToBlockUntil == null) {
+      comparatorResult = -1;
+    } else {
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+      comparatorResult = systemAdmin.offsetComparator(currentOffset, offsetToBlockUntil);
+    }
+
+    // If the starting offset, it must be greater (since the envelope at the starting offset will not yet have been processed)
+    // If not the starting offset, it must be greater than OR equal
+    if (comparatorResult != null && ((isStartingOffset && comparatorResult > 0) || (!isStartingOffset && comparatorResult >= 0))) {
+      LOG.info("Side input ssp {} has caught up to offset {}.", ssp, offsetToBlockUntil);

Review comment:
       can we keep the existing functionality as is in this PR? why do we need to differentiate this being checked during starting vs process?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -119,6 +125,28 @@ public void init() {
 
     this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
     LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+
+    this.sspOffsetsToBlockUntil = getOffsetsToBlockUntil();
+    LOG.info("Task {} will catch up to offsets {}", this.taskName, this.sspOffsetsToBlockUntil);
+
+    this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, true));
+  }
+
+  /**
+   * Retrieves the newest offset for each SSP
+   *
+   * @return a map of SSP to newest offset
+   */
+  private Map<SystemStreamPartition, String> getOffsetsToBlockUntil() {
+    Map<SystemStreamPartition, String> offsetsToBlockUntil = new HashMap<>();
+    for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
+      SystemStreamMetadata metadata = this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);

Review comment:
       minor: would be good to extract the boolean argument to variable for readability.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -604,6 +597,9 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel
           Map<String, StorageEngine> sideInputStores = getSideInputStores(taskName);
           Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new HashMap<>();
 
+          CountDownLatch taskCountDownLatch = new CountDownLatch(1);
+          this.sideInputTaskLatches.put(taskName, taskCountDownLatch);

Review comment:
       why do we need granular latches? Can we not use one latch with the size of number of side input tasks & check that reaches 0 instead?

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -154,17 +149,13 @@
   private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputStoreSSPs;
   private final Map<SystemStreamPartition, TaskSideInputHandler> sspSideInputHandlers;
   private SystemConsumers sideInputSystemConsumers;
-  private final Map<SystemStreamPartition, SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
-      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the taskSideInputSSPs recorded at start, used to determine when sideInputs are caughtup and container init can proceed
-  private volatile CountDownLatch sideInputsCaughtUp; // Used by the sideInput-read thread to signal to the main thread
+  private volatile Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used by the sideInput-read thread to signal to the main thread
   private volatile boolean shouldShutdown = false;
+  private RunLoop sideInputRunLoop;
 
   private final ExecutorService sideInputsReadExecutor = Executors.newSingleThreadExecutor(
       new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_READ_THREAD_NAME).build());
 
-  private final ScheduledExecutorService sideInputsFlushExecutor = Executors.newSingleThreadScheduledExecutor(
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
-  private ScheduledFuture sideInputsFlushFuture;

Review comment:
       CSM originally only handled state restoration which didn't require any commit/flush semantics. Eventually side inputs got rolled into CSM to pay way for standby and side inputs needed to be flushed in our commit cadence. The need for a separate timer thread to trigger flush/commit on task.commit.ms cadence was required. Instead of having logic to coordinate signals between read thread and the timer thread, flush was embedded as part of the timer thread and synchronization was introduced between process & commit.
   
   With run loop, the internal timer thread run loop maintains does this job of signaling commit readiness to the task  and hence you don't need a separate flush thread. However, one thing to note is, previously we can potentially have two operations (process & flush) in parallel for different side inputs but with the new setup, we will need a thread pool size of (number of side input tasks + 1) to ensure parity. We will eventually support parallelism across stores/side input tasks as part of transaction state support (i guess). 
   
   So it is fine to leave it as is.

##########
File path: samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.scheduler.EpochTimeScheduler;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.ReadableCoordinator;
+import org.apache.samza.task.TaskCallback;
+import org.apache.samza.task.TaskCallbackFactory;
+
+
+/**
+ * This class encapsulates the processing logic for side input streams. It is executed by {@link org.apache.samza.container.RunLoop}
+ */
+public class SideInputTask implements RunLoopTask {

Review comment:
       I am not sure I follow the later part of the comment @bkonold. The exclusivity holds regardless due to the fact that the side input handler has synchronization between flush & process. `SideInputTask` in the current state is also not likely impacted because all it does it delegate calls to handler & increment metrics.
   
   Handling async commit might be a bit more involved than just synchronizing process & commit. I'd need to think a bit more on this. I'd suggest we have a validation if possible to throw if async commit is enabled for now. 
   
   To Manasa's point, the fact that process & commit can happen concurrently doesn't translate to thread safety concerns for all implementations. It is possible that the implementations of process & commit don't need synchronization. However, it is useful to call out that synchronization between those were explicitly ignored due to the state of current implementation.
   
   

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -758,41 +745,50 @@ public void run() {
       sideInputSystemConsumers.register(ssp, startingOffset);
       taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
           ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+      sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+          ssp, ScalaJavaUtil.toScalaFunction(() -> this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+    }
 
-      SystemStreamMetadata systemStreamMetadata = streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
-      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
-          (systemStreamMetadata == null) ? null : systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+    Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = this.sspSideInputHandlers.values().stream()
+        .distinct()
+        .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, Function.identity()));
 
-      // record a copy of the sspMetadata, to later check if its caught up
-      initialSideInputSSPMetadata.put(ssp, sspMetadata);
+    Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+    this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        Set<SystemStreamPartition> taskSSPs = this.taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
 
-      // check if the ssp is caught to upcoming, even at start
-      checkSideInputCaughtUp(ssp, startingOffset, SystemStreamMetadata.OffsetType.UPCOMING, false);
-    }
+        RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs, taskSideInputHandlers.get(taskName), sideInputTaskMetrics.get(taskName));
+
+        sideInputTasks.put(taskName, sideInputTask);
+      });
 
     // start the systemConsumers for consuming input
     this.sideInputSystemConsumers.start();
 
+    TaskConfig taskConfig = new TaskConfig(this.config);
+    SamzaContainerMetrics sideInputContainerMetrics =
+        new SamzaContainerMetrics(SIDEINPUTS_METRICS_PREFIX + this.samzaContainerMetrics.source(),
+            this.samzaContainerMetrics.registry());
+
+    this.sideInputRunLoop = new RunLoop(sideInputTasks,
+        null, // all operations are executed in the main runloop thread
+        this.sideInputSystemConsumers,
+        1, // single message in flight per task
+        -1, // no windowing
+        taskConfig.getCommitMs(),
+        taskConfig.getCallbackTimeoutMs(),
+        this.config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)),

Review comment:
       can we extract this into a constant in some common file and use the constant in both places(here & SamzaContainer)?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -119,6 +125,28 @@ public void init() {
 
     this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
     LOG.info("Starting offsets for the task {}: {}", taskName, startingOffsets);
+
+    this.sspOffsetsToBlockUntil = getOffsetsToBlockUntil();
+    LOG.info("Task {} will catch up to offsets {}", this.taskName, this.sspOffsetsToBlockUntil);
+
+    this.startingOffsets.forEach((ssp, offset) -> checkCaughtUp(ssp, offset, true));
+  }
+
+  /**
+   * Retrieves the newest offset for each SSP
+   *
+   * @return a map of SSP to newest offset
+   */
+  private Map<SystemStreamPartition, String> getOffsetsToBlockUntil() {
+    Map<SystemStreamPartition, String> offsetsToBlockUntil = new HashMap<>();
+    for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
+      SystemStreamMetadata metadata = this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);

Review comment:
       minor: extract false to meaningful name to help readability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org