You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2022/11/17 00:51:26 UTC

[samza] branch master updated: SAMZA-2769: [PipelineDrain] Introduce enum in DrainNotification to allow for different drain modes (#1642)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d41284226 SAMZA-2769: [PipelineDrain] Introduce enum in DrainNotification to allow for different drain modes (#1642)
d41284226 is described below

commit d41284226430cbbef3377040f1a5e183628856ae
Author: ajo thomas <aj...@linkedin.com>
AuthorDate: Wed Nov 16 16:51:20 2022 -0800

    SAMZA-2769: [PipelineDrain] Introduce enum in DrainNotification to allow for different drain modes (#1642)
---
 .../java/org/apache/samza/drain/DrainMode.java     | 26 ++++------
 .../org/apache/samza/drain/DrainNotification.java  | 32 ++++++++++---
 .../samza/drain/DrainNotificationObjectMapper.java |  5 +-
 .../java/org/apache/samza/drain/DrainUtils.java    | 55 ++++++++++------------
 .../drain/DrainNotificationObjectMapperTests.java  |  2 +-
 .../org/apache/samza/drain/DrainUtilsTests.java    | 34 +++++++------
 6 files changed, 80 insertions(+), 74 deletions(-)

diff --git a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java b/samza-api/src/main/java/org/apache/samza/drain/DrainMode.java
similarity index 52%
copy from samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
copy to samza-api/src/main/java/org/apache/samza/drain/DrainMode.java
index 05c6b7da2..847b0e8c3 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
+++ b/samza-api/src/main/java/org/apache/samza/drain/DrainMode.java
@@ -18,24 +18,14 @@
  */
 package org.apache.samza.drain;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.util.UUID;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
 /**
- * Tests for {@link DrainNotificationObjectMapper}
+ * Defines the type of drain operation.
  * */
-public class DrainNotificationObjectMapperTests {
-  @Test
-  public void testDrainNotificationSerde() throws IOException {
-    UUID uuid = UUID.randomUUID();
-    DrainNotification originalMessage = new DrainNotification(uuid, "foobar");
-    ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
-    byte[] bytes = objectMapper.writeValueAsBytes(originalMessage);
-    DrainNotification deserializedMessage = objectMapper.readValue(bytes, DrainNotification.class);
-    assertEquals(originalMessage, deserializedMessage);
-  }
+public enum DrainMode {
+  /**
+   * This mode signifies the default behavior of the drain operation.
+   * All intermediate streams and any samza managed state will be drained and the pipeline will be gracefully shutdown
+   * as a result. User state will not be drained.
+   * */
+  DEFAULT
 }
diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainNotification.java b/samza-api/src/main/java/org/apache/samza/drain/DrainNotification.java
similarity index 66%
rename from samza-core/src/main/java/org/apache/samza/drain/DrainNotification.java
rename to samza-api/src/main/java/org/apache/samza/drain/DrainNotification.java
index dd97cd7a5..7cd65315c 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainNotification.java
+++ b/samza-api/src/main/java/org/apache/samza/drain/DrainNotification.java
@@ -19,11 +19,11 @@
 package org.apache.samza.drain;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
 import java.util.UUID;
 
 /**
- * DrainNotification is a custom message is used by an external controller to trigger Drain.
- * The message is written in the metadata store using {@link DrainUtils}.
+ * DrainNotification is a custom message used by an external controller to trigger Drain.
  * */
 public class DrainNotification {
   /**
@@ -35,9 +35,23 @@ public class DrainNotification {
    */
   private final String runId;
 
-  public DrainNotification(UUID uuid, String runId) {
+  /***/
+  private final DrainMode drainMode;
+
+  public DrainNotification(UUID uuid, String runId, DrainMode drainMode) {
+    Preconditions.checkNotNull(uuid);
+    Preconditions.checkNotNull(runId);
+    Preconditions.checkNotNull(drainMode);
     this.uuid = uuid;
     this.runId = runId;
+    this.drainMode = drainMode;
+  }
+
+  /**
+   * Creates a DrainNotification in {@link DrainMode#DEFAULT} mode.
+   * */
+  public static DrainNotification create(UUID uuid, String runId) {
+    return new DrainNotification(uuid, runId, DrainMode.DEFAULT);
   }
 
   public UUID getUuid() {
@@ -48,11 +62,16 @@ public class DrainNotification {
     return runId;
   }
 
+  public DrainMode getDrainMode() {
+    return drainMode;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("DrainMessage{");
     sb.append(" UUID: ").append(uuid);
-    sb.append(", runId: '").append(runId).append('\'');
+    sb.append(", runId: ").append(runId);
+    sb.append(", drainMode: ");
     sb.append('}');
     return sb.toString();
   }
@@ -67,11 +86,12 @@ public class DrainNotification {
     }
     DrainNotification that = (DrainNotification) o;
     return Objects.equal(uuid, that.uuid)
-        && Objects.equal(runId, that.runId);
+        && Objects.equal(runId, that.runId)
+        && Objects.equal(drainMode, that.drainMode);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(uuid, runId);
+    return Objects.hashCode(uuid, runId, drainMode);
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java b/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java
index 72cdfbb05..60245b4d1 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import java.util.UUID;
 
@@ -68,6 +69,7 @@ public class DrainNotificationObjectMapper {
       Map<String, Object> drainMessageMap = new HashMap<>();
       drainMessageMap.put("uuid", value.getUuid().toString());
       drainMessageMap.put("runId", value.getRunId());
+      drainMessageMap.put("drainMode", value.getDrainMode().toString());
       jsonGenerator.writeObject(drainMessageMap);
     }
   }
@@ -80,7 +82,8 @@ public class DrainNotificationObjectMapper {
       JsonNode node = oc.readTree(jsonParser);
       UUID uuid = UUID.fromString(node.get("uuid").textValue());
       String runId = node.get("runId").textValue();
-      return new DrainNotification(uuid, runId);
+      DrainMode drainMode = DrainMode.valueOf(node.get("drainMode").textValue().toUpperCase(Locale.ROOT));
+      return new DrainNotification(uuid, runId, drainMode);
     }
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
index 66449327e..6883b75fe 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
@@ -45,35 +45,6 @@ public class DrainUtils {
   private DrainUtils() {
   }
 
-  /**
-   * Writes a {@link DrainNotification} for a given runId to the underlying metastore. This method should be used by
-   * external controllers to issue a DrainNotification to the JobCoordinator and Samza Containers.
-   * @param metadataStore Metadata store to write drain notification to.
-   * @param runId runId for the DrainNotification
-   *
-   * @return generated uuid for the DrainNotification
-   */
-  @VisibleForTesting
-  public static UUID writeDrainNotification(MetadataStore metadataStore, String runId) {
-    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(runId), "runId should be non-null.");
-    LOG.info("Attempting to write DrainNotification to metadata-store for the deployment ID {}", runId);
-    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
-        new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
-    final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
-    final UUID uuid = UUID.randomUUID();
-    final DrainNotification message = new DrainNotification(uuid, runId);
-    try {
-      drainMetadataStore.put(message.getUuid().toString(), objectMapper.writeValueAsBytes(message));
-      drainMetadataStore.flush();
-      LOG.info("DrainNotification with id {} written to metadata-store for the deployment ID {}", uuid, runId);
-    } catch (Exception ex) {
-      throw new SamzaException(
-          String.format("DrainNotification might have been not written to metastore %s", message), ex);
-    }
-    return uuid;
-  }
-
   /**
    * Writes a {@link DrainNotification} to the underlying metastore. This method should be used by external controllers
    * to issue a DrainNotification to the JobCoordinator and Samza Containers.
@@ -91,7 +62,31 @@ public class DrainUtils {
           + "written to the metadata store.");
     }
     LOG.info("Received runId {}", runId);
-    return writeDrainNotification(metadataStore, runId);
+    LOG.info("Attempting to write DrainNotification to metadata-store for the deployment ID {}", runId);
+    final UUID uuid = UUID.randomUUID();
+    final DrainNotification message = DrainNotification.create(uuid, runId);
+    return writeDrainNotification(metadataStore, message);
+  }
+
+  /**
+   * Writes a {@link DrainNotification} to an underlying metadata store.
+   * */
+  @VisibleForTesting
+  static UUID writeDrainNotification(MetadataStore metadataStore, DrainNotification drainNotification) {
+    Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
+    Preconditions.checkArgument(drainNotification != null, "DrainNotification cannot be null.");
+    final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+        new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
+    final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
+    try {
+      drainMetadataStore.put(drainNotification.getUuid().toString(), objectMapper.writeValueAsBytes(drainNotification));
+      drainMetadataStore.flush();
+      LOG.info("DrainNotification with id {} written to metadata-store for the deployment ID {}", drainNotification.getUuid(), drainNotification.getUuid());
+    } catch (Exception ex) {
+      throw new SamzaException(
+          String.format("DrainNotification might have been not written to metastore %s", drainNotification), ex);
+    }
+    return drainNotification.getUuid();
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java b/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
index 05c6b7da2..757d09309 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
+++ b/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
@@ -32,7 +32,7 @@ public class DrainNotificationObjectMapperTests {
   @Test
   public void testDrainNotificationSerde() throws IOException {
     UUID uuid = UUID.randomUUID();
-    DrainNotification originalMessage = new DrainNotification(uuid, "foobar");
+    DrainNotification originalMessage = DrainNotification.create(uuid, "foobar");
     ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
     byte[] bytes = objectMapper.writeValueAsBytes(originalMessage);
     DrainNotification deserializedMessage = objectMapper.readValue(bytes, DrainNotification.class);
diff --git a/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java b/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java
index 43b91ad43..cc5b8a625 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java
+++ b/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java
@@ -69,45 +69,43 @@ public class DrainUtilsTests {
   @Test
   public void testWrites() {
     String runId1 = "foo1";
-    String runId2 = "foo2";
-    String runId3 = "foo3";
 
-    UUID uuid1 = DrainUtils.writeDrainNotification(coordinatorStreamStore, runId1);
-    UUID uuid2 = DrainUtils.writeDrainNotification(coordinatorStreamStore, runId2);
-    UUID uuid3 = DrainUtils.writeDrainNotification(coordinatorStreamStore, runId3);
+    DrainNotification drainNotification1 = DrainNotification.create(UUID.randomUUID(), runId1);
+    UUID uuid1 = DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification1);
 
-    DrainNotification expectedDrainNotification1 = new DrainNotification(uuid1, runId1);
-    DrainNotification expectedDrainNotification2 = new DrainNotification(uuid2, runId2);
-    DrainNotification expectedDrainNotification3 = new DrainNotification(uuid3, runId3);
-    Set<DrainNotification> expectedDrainNotifications = new HashSet<>(Arrays.asList(expectedDrainNotification1,
-        expectedDrainNotification2, expectedDrainNotification3));
+    DrainNotification expectedDrainNotification1 = DrainNotification.create(uuid1, runId1);
+    Set<DrainNotification> expectedDrainNotifications = new HashSet<>(Arrays.asList(expectedDrainNotification1));
 
     Optional<List<DrainNotification>> drainNotifications = readDrainNotificationMessages(coordinatorStreamStore);
     Assert.assertTrue(drainNotifications.isPresent());
-    Assert.assertEquals(3, drainNotifications.get().size());
+    Assert.assertEquals(1, drainNotifications.get().size());
     Assert.assertEquals(expectedDrainNotifications, new HashSet<>(drainNotifications.get()));
   }
 
   @Test
   public void testCleanup() {
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+    DrainNotification drainNotification1 = DrainNotification.create(UUID.randomUUID(), TEST_RUN_ID);
+    DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification1);
     DrainUtils.cleanup(coordinatorStreamStore, CONFIG);
     final Optional<List<DrainNotification>> drainNotifications1 = readDrainNotificationMessages(coordinatorStreamStore);
     Assert.assertFalse(drainNotifications1.isPresent());
 
-    final String runId = "bar";
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, runId);
+    final String runId2 = "bar";
+    DrainNotification drainNotification2 = DrainNotification.create(UUID.randomUUID(), runId2);
+    DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification2);
     DrainUtils.cleanup(coordinatorStreamStore, CONFIG);
     final Optional<List<DrainNotification>> drainNotifications2 = readDrainNotificationMessages(coordinatorStreamStore);
     Assert.assertTrue(drainNotifications2.isPresent());
-    Assert.assertEquals(runId, drainNotifications2.get().get(0).getRunId());
+    Assert.assertEquals(runId2, drainNotifications2.get().get(0).getRunId());
   }
 
   @Test
   public void testCleanupAll() {
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
-    DrainUtils.writeDrainNotification(coordinatorStreamStore, "bar");
+    DrainNotification drainNotification1 = DrainNotification.create(UUID.randomUUID(), TEST_RUN_ID);
+    DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification1);
+    final String runId2 = "bar";
+    DrainNotification drainNotification2 = DrainNotification.create(UUID.randomUUID(), runId2);
+    DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification2);
     DrainUtils.cleanupAll(coordinatorStreamStore);
     final Optional<List<DrainNotification>> drainNotifications = readDrainNotificationMessages(coordinatorStreamStore);
     Assert.assertFalse(drainNotifications.isPresent());