You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/05/13 22:17:53 UTC

[GitHub] [samza] xinyuiscool commented on a diff in pull request #1605: SAMZA-2742: [Pipeline Drain] [WIP] Add Drain components and integrate them with SamzaContainer and JC

xinyuiscool commented on code in PR #1605:
URL: https://github.com/apache/samza/pull/1605#discussion_r872851658


##########
samza-core/src/main/java/org/apache/samza/drain/DrainManager.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ .* with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DrainManager reads and writes {@link DrainNotification} to the provided {@link MetadataStore}.
+ *
+ * It uses {@link NamespaceAwareCoordinatorStreamStore} for namespace-aware read/write/delete operations on the
+ * metadata store.
+ * */
+public class DrainManager {
+  private static final Logger LOG = LoggerFactory.getLogger(DrainManager.class);
+
+  private static final Integer VERSION = 1;
+  // namespace for the underlying metadata store
+  private static final String DRAIN_METADATA_STORE_NAMESPACE = "samza-drain-v" + VERSION;
+
+  private final NamespaceAwareCoordinatorStreamStore drainMetadataStore;
+  private final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
+
+  private boolean running = false;
+
+  public DrainManager(MetadataStore metadataStore) {
+    Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
+    this.drainMetadataStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
+  }
+
+  /**
+   * Perform startup operations.
+   */
+  public void start() {
+    if (running) {
+      LOG.warn("DrainManager already started.");
+    } else {
+      drainMetadataStore.init();
+      running = true;
+      LOG.info("Started DrainManager.");
+    }
+  }
+
+  /**
+   * Perform teardown operations.
+   */
+  public void stop() {
+    if (running) {
+      drainMetadataStore.close();
+      running = false;
+      LOG.info("Stopped DrainManager.");
+    } else {
+      LOG.warn("DrainManager already stopped.");
+    }
+  }
+
+  /**
+   * Writes a {@link DrainNotification} to the underlying metastore. This method should be used by external controllers
+   * to issue a DrainNotification to the JobCoordinator and Samza Containers.
+   *
+   * @return uuid generated for the request
+   */
+  public UUID writeDrainNotification(String deploymentId) {

Review Comment:
   Seems this DrainManager will be hard to use in external controllers. Is you plan to provide some kind of util class with a main() so it can be invoked in a script to write Drain messages? For example, check out the checkpoint-tool.sh.



##########
samza-core/src/main/java/org/apache/samza/drain/DrainMonitor.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+/**
+ * DrainMonitor implementations are intended to monitor the MetadataStore for {@link DrainNotification} and invokes
+ * the {@link Callback}.
+ * */
+public interface DrainMonitor {
+  /**
+   * Starts the DrainMonitor.
+   * */
+  void start();
+
+  /**
+   * Stops the DrainMonitor.
+   * */
+  void stop();
+
+  /**
+   * Register a callback to be executed when DrainNotification is encountered.
+   *
+   * @param callback the callback to register.
+   * @return Returns {@code true} if registration was successful and {@code false} if not.
+   * Registration can fail it the DrainMonitor is stopped or a callback is already registered.
+   * */
+  boolean registerCallback(Callback callback);

Review Comment:
   Here the interface becomes a bit hard to understand. I would assume DrainMonitor is the callback, which will invoke further actions of drain. But here it seems using monitor to hold the callback, but the callback is passed in somewhere. It's quite counterintuitive.



##########
samza-core/src/main/java/org/apache/samza/drain/SinglePollDrainMonitor.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.drain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * {@link DrainMonitor} implementation which polls the {@link DrainManager} for {@link DrainNotification} exactly one
+ * time.
+ *
+ * One time poll is triggered by calling {@link #start()}. The poll and callback invocation happens on the caller's
+ * thread.
+ *
+ * This is intended to be used by SamzaContainer to check if the container should be set in drain mode prior to starting
+ * the registered consumers. This particularly applies for cases where the container did not complete the previous drain
+ * request and should immediately drain upon restart without consuming any new messages.
+ *
+ * {@link DrainMonitor.Callback} should be registered prior to calling {@link #start()}.
+ * Callback's {@link Callback#onDrain()} method is called by the monitor when DrainNotification matching the current
+ * deployment id is encountered.
+ * */
+public class SinglePollDrainMonitor implements DrainMonitor {

Review Comment:
   Seems this difference of this drainMonitor v.s. PollingDrainMonitor is that the latter keeps tracks of the state and will keep polling? Seems we could merge them into one, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org