You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2022/11/17 00:51:26 UTC
[samza] branch master updated: SAMZA-2769: [PipelineDrain] Introduce enum in DrainNotification to allow for different drain modes (#1642)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d41284226 SAMZA-2769: [PipelineDrain] Introduce enum in DrainNotification to allow for different drain modes (#1642)
d41284226 is described below
commit d41284226430cbbef3377040f1a5e183628856ae
Author: ajo thomas <aj...@linkedin.com>
AuthorDate: Wed Nov 16 16:51:20 2022 -0800
SAMZA-2769: [PipelineDrain] Introduce enum in DrainNotification to allow for different drain modes (#1642)
---
.../java/org/apache/samza/drain/DrainMode.java | 26 ++++------
.../org/apache/samza/drain/DrainNotification.java | 32 ++++++++++---
.../samza/drain/DrainNotificationObjectMapper.java | 5 +-
.../java/org/apache/samza/drain/DrainUtils.java | 55 ++++++++++------------
.../drain/DrainNotificationObjectMapperTests.java | 2 +-
.../org/apache/samza/drain/DrainUtilsTests.java | 34 +++++++------
6 files changed, 80 insertions(+), 74 deletions(-)
diff --git a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java b/samza-api/src/main/java/org/apache/samza/drain/DrainMode.java
similarity index 52%
copy from samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
copy to samza-api/src/main/java/org/apache/samza/drain/DrainMode.java
index 05c6b7da2..847b0e8c3 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
+++ b/samza-api/src/main/java/org/apache/samza/drain/DrainMode.java
@@ -18,24 +18,14 @@
*/
package org.apache.samza.drain;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.util.UUID;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
/**
- * Tests for {@link DrainNotificationObjectMapper}
+ * Defines the type of drain operation.
* */
-public class DrainNotificationObjectMapperTests {
- @Test
- public void testDrainNotificationSerde() throws IOException {
- UUID uuid = UUID.randomUUID();
- DrainNotification originalMessage = new DrainNotification(uuid, "foobar");
- ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
- byte[] bytes = objectMapper.writeValueAsBytes(originalMessage);
- DrainNotification deserializedMessage = objectMapper.readValue(bytes, DrainNotification.class);
- assertEquals(originalMessage, deserializedMessage);
- }
+public enum DrainMode {
+ /**
+ * This mode signifies the default behavior of the drain operation.
+ * All intermediate streams and any samza managed state will be drained and the pipeline will be gracefully shutdown
+ * as a result. User state will not be drained.
+ * */
+ DEFAULT
}
diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainNotification.java b/samza-api/src/main/java/org/apache/samza/drain/DrainNotification.java
similarity index 66%
rename from samza-core/src/main/java/org/apache/samza/drain/DrainNotification.java
rename to samza-api/src/main/java/org/apache/samza/drain/DrainNotification.java
index dd97cd7a5..7cd65315c 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainNotification.java
+++ b/samza-api/src/main/java/org/apache/samza/drain/DrainNotification.java
@@ -19,11 +19,11 @@
package org.apache.samza.drain;
import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
import java.util.UUID;
/**
- * DrainNotification is a custom message is used by an external controller to trigger Drain.
- * The message is written in the metadata store using {@link DrainUtils}.
+ * DrainNotification is a custom message used by an external controller to trigger Drain.
* */
public class DrainNotification {
/**
@@ -35,9 +35,23 @@ public class DrainNotification {
*/
private final String runId;
- public DrainNotification(UUID uuid, String runId) {
+ /***/
+ private final DrainMode drainMode;
+
+ public DrainNotification(UUID uuid, String runId, DrainMode drainMode) {
+ Preconditions.checkNotNull(uuid);
+ Preconditions.checkNotNull(runId);
+ Preconditions.checkNotNull(drainMode);
this.uuid = uuid;
this.runId = runId;
+ this.drainMode = drainMode;
+ }
+
+ /**
+ * Creates a DrainNotification in {@link DrainMode#DEFAULT} mode.
+ * */
+ public static DrainNotification create(UUID uuid, String runId) {
+ return new DrainNotification(uuid, runId, DrainMode.DEFAULT);
}
public UUID getUuid() {
@@ -48,11 +62,16 @@ public class DrainNotification {
return runId;
}
+ public DrainMode getDrainMode() {
+ return drainMode;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DrainMessage{");
sb.append(" UUID: ").append(uuid);
- sb.append(", runId: '").append(runId).append('\'');
+ sb.append(", runId: ").append(runId);
+ sb.append(", drainMode: ");
sb.append('}');
return sb.toString();
}
@@ -67,11 +86,12 @@ public class DrainNotification {
}
DrainNotification that = (DrainNotification) o;
return Objects.equal(uuid, that.uuid)
- && Objects.equal(runId, that.runId);
+ && Objects.equal(runId, that.runId)
+ && Objects.equal(drainMode, that.drainMode);
}
@Override
public int hashCode() {
- return Objects.hashCode(uuid, runId);
+ return Objects.hashCode(uuid, runId, drainMode);
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java b/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java
index 72cdfbb05..60245b4d1 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/drain/DrainNotificationObjectMapper.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import java.util.UUID;
@@ -68,6 +69,7 @@ public class DrainNotificationObjectMapper {
Map<String, Object> drainMessageMap = new HashMap<>();
drainMessageMap.put("uuid", value.getUuid().toString());
drainMessageMap.put("runId", value.getRunId());
+ drainMessageMap.put("drainMode", value.getDrainMode().toString());
jsonGenerator.writeObject(drainMessageMap);
}
}
@@ -80,7 +82,8 @@ public class DrainNotificationObjectMapper {
JsonNode node = oc.readTree(jsonParser);
UUID uuid = UUID.fromString(node.get("uuid").textValue());
String runId = node.get("runId").textValue();
- return new DrainNotification(uuid, runId);
+ DrainMode drainMode = DrainMode.valueOf(node.get("drainMode").textValue().toUpperCase(Locale.ROOT));
+ return new DrainNotification(uuid, runId, drainMode);
}
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
index 66449327e..6883b75fe 100644
--- a/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/drain/DrainUtils.java
@@ -45,35 +45,6 @@ public class DrainUtils {
private DrainUtils() {
}
- /**
- * Writes a {@link DrainNotification} for a given runId to the underlying metastore. This method should be used by
- * external controllers to issue a DrainNotification to the JobCoordinator and Samza Containers.
- * @param metadataStore Metadata store to write drain notification to.
- * @param runId runId for the DrainNotification
- *
- * @return generated uuid for the DrainNotification
- */
- @VisibleForTesting
- public static UUID writeDrainNotification(MetadataStore metadataStore, String runId) {
- Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(runId), "runId should be non-null.");
- LOG.info("Attempting to write DrainNotification to metadata-store for the deployment ID {}", runId);
- final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
- new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
- final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
- final UUID uuid = UUID.randomUUID();
- final DrainNotification message = new DrainNotification(uuid, runId);
- try {
- drainMetadataStore.put(message.getUuid().toString(), objectMapper.writeValueAsBytes(message));
- drainMetadataStore.flush();
- LOG.info("DrainNotification with id {} written to metadata-store for the deployment ID {}", uuid, runId);
- } catch (Exception ex) {
- throw new SamzaException(
- String.format("DrainNotification might have been not written to metastore %s", message), ex);
- }
- return uuid;
- }
-
/**
* Writes a {@link DrainNotification} to the underlying metastore. This method should be used by external controllers
* to issue a DrainNotification to the JobCoordinator and Samza Containers.
@@ -91,7 +62,31 @@ public class DrainUtils {
+ "written to the metadata store.");
}
LOG.info("Received runId {}", runId);
- return writeDrainNotification(metadataStore, runId);
+ LOG.info("Attempting to write DrainNotification to metadata-store for the deployment ID {}", runId);
+ final UUID uuid = UUID.randomUUID();
+ final DrainNotification message = DrainNotification.create(uuid, runId);
+ return writeDrainNotification(metadataStore, message);
+ }
+
+ /**
+ * Writes a {@link DrainNotification} to an underlying metadata store.
+ * */
+ @VisibleForTesting
+ static UUID writeDrainNotification(MetadataStore metadataStore, DrainNotification drainNotification) {
+ Preconditions.checkArgument(metadataStore != null, "MetadataStore cannot be null.");
+ Preconditions.checkArgument(drainNotification != null, "DrainNotification cannot be null.");
+ final NamespaceAwareCoordinatorStreamStore drainMetadataStore =
+ new NamespaceAwareCoordinatorStreamStore(metadataStore, DRAIN_METADATA_STORE_NAMESPACE);
+ final ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
+ try {
+ drainMetadataStore.put(drainNotification.getUuid().toString(), objectMapper.writeValueAsBytes(drainNotification));
+ drainMetadataStore.flush();
+ LOG.info("DrainNotification with id {} written to metadata-store for the deployment ID {}", drainNotification.getUuid(), drainNotification.getUuid());
+ } catch (Exception ex) {
+ throw new SamzaException(
+ String.format("DrainNotification might have been not written to metastore %s", drainNotification), ex);
+ }
+ return drainNotification.getUuid();
}
/**
diff --git a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java b/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
index 05c6b7da2..757d09309 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
+++ b/samza-core/src/test/java/org/apache/samza/drain/DrainNotificationObjectMapperTests.java
@@ -32,7 +32,7 @@ public class DrainNotificationObjectMapperTests {
@Test
public void testDrainNotificationSerde() throws IOException {
UUID uuid = UUID.randomUUID();
- DrainNotification originalMessage = new DrainNotification(uuid, "foobar");
+ DrainNotification originalMessage = DrainNotification.create(uuid, "foobar");
ObjectMapper objectMapper = DrainNotificationObjectMapper.getObjectMapper();
byte[] bytes = objectMapper.writeValueAsBytes(originalMessage);
DrainNotification deserializedMessage = objectMapper.readValue(bytes, DrainNotification.class);
diff --git a/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java b/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java
index 43b91ad43..cc5b8a625 100644
--- a/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java
+++ b/samza-core/src/test/java/org/apache/samza/drain/DrainUtilsTests.java
@@ -69,45 +69,43 @@ public class DrainUtilsTests {
@Test
public void testWrites() {
String runId1 = "foo1";
- String runId2 = "foo2";
- String runId3 = "foo3";
- UUID uuid1 = DrainUtils.writeDrainNotification(coordinatorStreamStore, runId1);
- UUID uuid2 = DrainUtils.writeDrainNotification(coordinatorStreamStore, runId2);
- UUID uuid3 = DrainUtils.writeDrainNotification(coordinatorStreamStore, runId3);
+ DrainNotification drainNotification1 = DrainNotification.create(UUID.randomUUID(), runId1);
+ UUID uuid1 = DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification1);
- DrainNotification expectedDrainNotification1 = new DrainNotification(uuid1, runId1);
- DrainNotification expectedDrainNotification2 = new DrainNotification(uuid2, runId2);
- DrainNotification expectedDrainNotification3 = new DrainNotification(uuid3, runId3);
- Set<DrainNotification> expectedDrainNotifications = new HashSet<>(Arrays.asList(expectedDrainNotification1,
- expectedDrainNotification2, expectedDrainNotification3));
+ DrainNotification expectedDrainNotification1 = DrainNotification.create(uuid1, runId1);
+ Set<DrainNotification> expectedDrainNotifications = new HashSet<>(Arrays.asList(expectedDrainNotification1));
Optional<List<DrainNotification>> drainNotifications = readDrainNotificationMessages(coordinatorStreamStore);
Assert.assertTrue(drainNotifications.isPresent());
- Assert.assertEquals(3, drainNotifications.get().size());
+ Assert.assertEquals(1, drainNotifications.get().size());
Assert.assertEquals(expectedDrainNotifications, new HashSet<>(drainNotifications.get()));
}
@Test
public void testCleanup() {
- DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
+ DrainNotification drainNotification1 = DrainNotification.create(UUID.randomUUID(), TEST_RUN_ID);
+ DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification1);
DrainUtils.cleanup(coordinatorStreamStore, CONFIG);
final Optional<List<DrainNotification>> drainNotifications1 = readDrainNotificationMessages(coordinatorStreamStore);
Assert.assertFalse(drainNotifications1.isPresent());
- final String runId = "bar";
- DrainUtils.writeDrainNotification(coordinatorStreamStore, runId);
+ final String runId2 = "bar";
+ DrainNotification drainNotification2 = DrainNotification.create(UUID.randomUUID(), runId2);
+ DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification2);
DrainUtils.cleanup(coordinatorStreamStore, CONFIG);
final Optional<List<DrainNotification>> drainNotifications2 = readDrainNotificationMessages(coordinatorStreamStore);
Assert.assertTrue(drainNotifications2.isPresent());
- Assert.assertEquals(runId, drainNotifications2.get().get(0).getRunId());
+ Assert.assertEquals(runId2, drainNotifications2.get().get(0).getRunId());
}
@Test
public void testCleanupAll() {
- DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
- DrainUtils.writeDrainNotification(coordinatorStreamStore, TEST_RUN_ID);
- DrainUtils.writeDrainNotification(coordinatorStreamStore, "bar");
+ DrainNotification drainNotification1 = DrainNotification.create(UUID.randomUUID(), TEST_RUN_ID);
+ DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification1);
+ final String runId2 = "bar";
+ DrainNotification drainNotification2 = DrainNotification.create(UUID.randomUUID(), runId2);
+ DrainUtils.writeDrainNotification(coordinatorStreamStore, drainNotification2);
DrainUtils.cleanupAll(coordinatorStreamStore);
final Optional<List<DrainNotification>> drainNotifications = readDrainNotificationMessages(coordinatorStreamStore);
Assert.assertFalse(drainNotifications.isPresent());