You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/09/12 16:56:55 UTC
[gobblin] branch master updated: [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d0e530a78 [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
d0e530a78 is described below
commit d0e530a78ae0417ea6e3ab6aeaeb4e2f260e227f
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Mon Sep 12 09:56:48 2022 -0700
[GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
* [GOBBLIN-1673] Schema for dynamic work unit message
* [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
---
.../runtime/messaging/DynamicWorkUnitConsumer.java | 65 ++++++++++++
.../runtime/messaging/DynamicWorkUnitProducer.java | 56 +++++++++++
.../gobblin/runtime/messaging/MessageBuffer.java | 82 +++++++++++++++
.../DynamicWorkUnitDeserializationException.java | 30 ++++++
.../messaging/data/DynamicWorkUnitMessage.java | 45 +++++++++
.../messaging/data/DynamicWorkUnitSerde.java | 111 +++++++++++++++++++++
.../messaging/data/SplitWorkUnitMessage.java | 39 ++++++++
.../messaging/handler/SplitMessageHandler.java | 42 ++++++++
.../messaging/data/DynamicWorkUnitSerdeTest.java | 64 ++++++++++++
9 files changed, 534 insertions(+)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java
new file mode 100644
index 000000000..ffee68b3e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Receives {@link DynamicWorkUnitMessage} sent by {@link DynamicWorkUnitProducer}.
+ * The class is used as a callback for the {@link MessageBuffer}. All business logic
+ * is done in the {@link DynamicWorkUnitMessage.Handler}.<br><br>
+ *
+ * {@link DynamicWorkUnitConsumer#accept(List)} is the entrypoint for processing {@link DynamicWorkUnitMessage}
+ * received by the {@link MessageBuffer} after calling {@link MessageBuffer#subscribe(Consumer)}<br><br>
+ *
+ * Each newly published {@link DynamicWorkUnitMessage} is passed to a {@link DynamicWorkUnitMessage.Handler}
+ * and will call {@link DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)} to do business logic
+ */
+public class DynamicWorkUnitConsumer implements Consumer<List<DynamicWorkUnitMessage>> {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicWorkUnitConsumer.class);
+ private final List<DynamicWorkUnitMessage.Handler> messageHandlers;
+
+ public DynamicWorkUnitConsumer(Collection<DynamicWorkUnitMessage.Handler> handlers) {
+ this.messageHandlers = new ArrayList<>(handlers);
+ }
+
+ /**
+ * Entry point for processing messages sent by {@link DynamicWorkUnitProducer} via {@link MessageBuffer}. This
+ * calls {@link DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)} method for each handler added via
+ * {@link DynamicWorkUnitConsumer#DynamicWorkUnitConsumer(Collection<DynamicWorkUnitMessage.Handler>)
+ */
+ @Override
+ public void accept(List<DynamicWorkUnitMessage> messages) {
+ for (DynamicWorkUnitMessage msg : messages) {
+ handleMessage(msg);
+ }
+ }
+
+ private void handleMessage(DynamicWorkUnitMessage msg) {
+ LOG.debug("{} handling message={}", DynamicWorkUnitConsumer.class.getSimpleName(), msg);
+ for (DynamicWorkUnitMessage.Handler handler : this.messageHandlers) {
+ handler.handle(msg);
+ }
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitProducer.java
new file mode 100644
index 000000000..c50e34162
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitProducer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging;
+
+import java.io.IOException;
+import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Producer runs in each task runner for sending {@link DynamicWorkUnitMessage} over a {@link MessageBuffer}
+ * to be consumed by {@link DynamicWorkUnitConsumer} in the AM<br><br>
+ *
+ * A {@link DynamicWorkUnitProducer} has a tight coupling with the {@link DynamicWorkUnitConsumer}
+ * since both producer / consumer should be using the same {@link MessageBuffer}
+ * from the same {@link MessageBuffer.Factory} and using the same channel name
+ */
+public class DynamicWorkUnitProducer {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicWorkUnitConsumer.class);
+ private final MessageBuffer<DynamicWorkUnitMessage> messageBuffer;
+
+ public DynamicWorkUnitProducer(MessageBuffer<DynamicWorkUnitMessage> messageBuffer) {
+ this.messageBuffer = messageBuffer;
+ }
+
+ /**
+ * Send a {@link DynamicWorkUnitMessage} to be consumed by a {@link DynamicWorkUnitConsumer}
+ * @param message Message to be sent over the message buffer
+ */
+ public boolean produce(DynamicWorkUnitMessage message) throws IOException {
+ LOG.debug("Sending message over message buffer, messageBuffer={}, message={}",
+ messageBuffer.getClass().getSimpleName(), message);
+ try {
+ messageBuffer.publish(message);
+ return true;
+ } catch (IOException e) {
+ LOG.debug("Failed to publish message. exception=", e);
+ return false;
+ }
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/MessageBuffer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/MessageBuffer.java
new file mode 100644
index 000000000..612986378
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/MessageBuffer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+/**
+ * Unidirectional buffer for sending messages from container to container.
+ * Example use case is for sending messages from taskrunner -> AM. If messages need to be
+ * sent bi-directionally, use multiple message buffers with different channel names.<br><br>
+ *
+ * {@link MessageBuffer} should only be instantiated using {@link MessageBuffer.Factory#getBuffer(String)} and not via constructor.
+ * A {@link MessageBuffer} will only communicate with other {@link MessageBuffer}(s) created by the same {@link MessageBuffer.Factory}
+ * and the same channel name.
+ *
+ * This interface provides the following guarantees:
+ * <ul>
+ * <li>No guaranteed order delivery</li>
+ * <li>Single reader calling subscribe method</li>
+ * <li>Multiple concurrent writers calling publish</li>
+ * <li>Messages delivered at most once</li>
+ * </ul>
+ */
+public interface MessageBuffer<T> {
+ /**
+ * Alias for the message buffer. Message buffers will only communicate with other message buffers
+ * using the same channel name and coming from the same {@link MessageBuffer.Factory}
+ *
+ * i.e. When 2 containers use the same factory implementation to create a {@link MessageBuffer} with the same
+ * {@link MessageBuffer#getChannelName()}, they should be able to send messages uni-directionally from
+ * publisher to subscriber.
+ * @return channel name
+ */
+ String getChannelName();
+
+ /**
+ * Publish item to message buffer for consumption by subscribers
+ * @param item item to publish to subscribers
+ * @return Is item successfully added to buffer
+ * @throws IOException if unable to add message to buffer
+ */
+ void publish(T item) throws IOException;
+
+ /**
+ * Add callback {@link Consumer} object that will consume all published {@link T}. It is safe to subscribe multiple
+ * consumer objects but only one container should be calling this method per channel. This is
+ * because the message buffer API supports at-most once delivery and one reader, so reads are destructive
+ */
+ void subscribe(Consumer<List<T>> consumer);
+
+ /**
+ * Factory for instantiating {@link MessageBuffer} with a specific channel name. Message buffers produced by the same
+ * factory and using the same channel name are able to communicate with each other.
+ * @param <T>
+ */
+ interface Factory<T> {
+ /**
+ * Create {@link MessageBuffer} with specific channel name. {@link MessageBuffer}(s) with the same channel name and
+ * from the same factory will exclusively communicate with eachother.
+ * @param channelName channel namespace
+ * @return Message Buffer
+ */
+ MessageBuffer<T> getBuffer(String channelName);
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitDeserializationException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitDeserializationException.java
new file mode 100644
index 000000000..7da4fbf9e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitDeserializationException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+/**
+ * An exception when {@link DynamicWorkUnitMessage} cannot be correctly deserialized from underlying message storage
+ */
+public class DynamicWorkUnitDeserializationException extends RuntimeException {
+ public DynamicWorkUnitDeserializationException(String message) {
+ super(message);
+ }
+
+ public DynamicWorkUnitDeserializationException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitMessage.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitMessage.java
new file mode 100644
index 000000000..731423b12
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import gobblin.source.workunit.WorkUnit;
+
+/**
+ * Generic message for sending updates about a workunit during runtime. Implementations can
+ * extend this interface with other getters / properties to piggyback information specific to the message subtype.
+ *
+ * For example, the {@link SplitWorkUnitMessage} extends this interface by adding fields that are not specified in this
+ * interface.
+ */
+public interface DynamicWorkUnitMessage {
+ /**
+ * The WorkUnit Id this message is associated with. Same as {@link WorkUnit#getId()}
+ * @return WorkUnit Id
+ */
+ String getWorkUnitId();
+
+ /**
+ * Handler for processing messages and implementing business logic
+ */
+ interface Handler {
+ /**
+ * Process this message by handling business logic
+ * @param message
+ */
+ void handle(DynamicWorkUnitMessage message);
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerde.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerde.java
new file mode 100644
index 000000000..f32cccc7d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerde.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.nio.charset.Charset;
+import lombok.NonNull;
+
+/**
+ * Library for serializing / deserializing {@link DynamicWorkUnitMessage}. This solution maintains implementation
+ * specific fields not specified in the interface and is dependent on {@link Gson} to do the serialization / deserialization
+ */
+public final class DynamicWorkUnitSerde {
+ private static final Gson GSON = new Gson();
+ private static final String PROPS_PREFIX = "DynamicWorkUnit.Props";
+ private static final String MESSAGE_IMPLEMENTATION = PROPS_PREFIX + ".MessageImplementationClass";
+ private static final Charset DEFAULT_CHAR_ENCODING = Charsets.UTF_8;
+
+ // Suppresses default constructor, ensuring non-instantiability.
+ private DynamicWorkUnitSerde() { }
+
+ /**
+ * Serialize message into bytes. To deserialize, use {@link DynamicWorkUnitSerde#deserialize(byte[])}.
+ * Serialization preserves underlying properties of the {@link DynamicWorkUnitMessage} implementation.<br><br>
+ * For example, the {@link SplitWorkUnitMessage} implements
+ * {@link DynamicWorkUnitMessage} and has implementation specific properties such as
+ * {@link SplitWorkUnitMessage#getLaggingTopicPartitions()}. These properties will be maintained after serde.
+ * @param msg message to serialize
+ * @return message as bytes
+ */
+ public static byte[] serialize(DynamicWorkUnitMessage msg) {
+ Preconditions.checkNotNull(msg, "Input message cannot be null");
+ return toJsonObject(msg)
+ .toString()
+ .getBytes(DEFAULT_CHAR_ENCODING);
+ }
+
+ /**
+ * Deserialize bytes into message object. Input message byte array should have been serialized using
+ * {@link DynamicWorkUnitSerde#serialize(DynamicWorkUnitMessage)}.
+ * @param serializedMessage message that has been serialized by {@link DynamicWorkUnitSerde#serialize(DynamicWorkUnitMessage)}
+ * @return DynamicWorkUnitMessage object
+ */
+ public static DynamicWorkUnitMessage deserialize(byte[] serializedMessage) {
+ String json = new String(serializedMessage, DEFAULT_CHAR_ENCODING);
+ JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
+ return toDynamicWorkUnitMessage(jsonObject);
+ }
+
+ /**
+ * Helper method for deserializing {@link JsonObject} to {@link DynamicWorkUnitMessage}
+ * @param json Message serialized using {@link DynamicWorkUnitSerde#toJsonObject}
+ * @return {@link DynamicWorkUnitMessage} POJO representation of the given json
+ */
+ private static <T extends DynamicWorkUnitMessage> DynamicWorkUnitMessage toDynamicWorkUnitMessage(JsonObject json) {
+ Preconditions.checkNotNull(json, "Serialized msg cannot be null");
+ try {
+ if (!json.has(MESSAGE_IMPLEMENTATION)) {
+ throw new DynamicWorkUnitDeserializationException(
+ String.format("Unable to deserialize json to %s. Ensure that %s "
+ + "is used for serialization. %s does not have the key=%s used for deserializing to correct message "
+ + "implementation. json=%s",
+ DynamicWorkUnitMessage.class.getSimpleName(),
+ "DynamicWorkSerde#serialize(DynamicWorkUnitMessage msg)",
+ json.getClass().getSimpleName(),
+ MESSAGE_IMPLEMENTATION,
+ json));
+ }
+ Class<T> clazz = (Class<T>) Class.forName(json.get(MESSAGE_IMPLEMENTATION).getAsString());
+ return GSON.fromJson(json, clazz);
+ } catch (ClassNotFoundException e) {
+ throw new DynamicWorkUnitDeserializationException(
+ String.format("Input param %s contains invalid value for key=%s. This can be caused by the deserializer having"
+ + " different dependencies from the serializer. json=%s",
+ json.getClass(),
+ MESSAGE_IMPLEMENTATION,
+ json), e);
+ }
+ }
+
+ /**
+ * Helper method for serializing {@link DynamicWorkUnitMessage} to {@link JsonObject}
+ * @param msg Message to serialize
+ * @return json representation of message
+ */
+ private static JsonObject toJsonObject(@NonNull DynamicWorkUnitMessage msg) {
+ Preconditions.checkNotNull(msg, "Input message cannot be null");
+ JsonElement json = GSON.toJsonTree(msg);
+ JsonObject obj = json.getAsJsonObject();
+ obj.addProperty(MESSAGE_IMPLEMENTATION, msg.getClass().getName());
+ return obj;
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/SplitWorkUnitMessage.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/SplitWorkUnitMessage.java
new file mode 100644
index 000000000..b0cf7e0cc
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/SplitWorkUnitMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import java.util.List;
+import lombok.Builder;
+import lombok.Value;
+
+/**
+ * Message for the task runner to request the AM to split a workunit into multiple workunits
+ * because a subset of topic partitions are lagging in the specified workunit.
+ */
+@Value
+@Builder
+public class SplitWorkUnitMessage implements DynamicWorkUnitMessage {
+ /**
+ * Workunit ID of the work unit that should be split into multiple smaller workunits
+ */
+ String workUnitId;
+
+ /**
+ * Topic partitions that have been lagging in the workunit
+ */
+ List<String> laggingTopicPartitions;
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/handler/SplitMessageHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/handler/SplitMessageHandler.java
new file mode 100644
index 000000000..d917b0a7f
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/handler/SplitMessageHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.handler;
+
+import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.apache.gobblin.runtime.messaging.data.SplitWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handler that runs on the AM for processing {@link SplitWorkUnitMessage} and re-computing
+ * workunit to multiple smaller work units.
+ */
+public class SplitMessageHandler implements DynamicWorkUnitMessage.Handler {
+ private static final Logger LOG = LoggerFactory.getLogger(SplitMessageHandler.class);
+
+ @Override
+ public void handle(DynamicWorkUnitMessage message) {
+ if (message instanceof SplitWorkUnitMessage) {
+ handleSplit((SplitWorkUnitMessage) message);
+ }
+ }
+
+ private void handleSplit(SplitWorkUnitMessage message) {
+ //TODO: GOBBLIN-1688 Recompute workunit based on message.
+ LOG.info("Handling {}, message={}", SplitWorkUnitMessage.class.getSimpleName(), message);
+ }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerdeTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerdeTest.java
new file mode 100644
index 000000000..d9d60071c
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerdeTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import com.google.gson.Gson;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import lombok.extern.java.Log;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.*;
+
+@Log
+@Test
+public class DynamicWorkUnitSerdeTest {
+ @Test
+ public void testSerialization() {
+ DynamicWorkUnitMessage msg = SplitWorkUnitMessage.builder()
+ .workUnitId("workUnitId")
+ .laggingTopicPartitions(Arrays.asList("topic-1","topic-2"))
+ .build();
+
+ byte[] serializedMsg = DynamicWorkUnitSerde.serialize(msg);
+
+ DynamicWorkUnitMessage deserializedMsg = DynamicWorkUnitSerde.deserialize(serializedMsg);
+
+ assertTrue(deserializedMsg instanceof SplitWorkUnitMessage);
+ assertEquals(msg, deserializedMsg);
+ }
+
+ @Test(expectedExceptions = DynamicWorkUnitDeserializationException.class)
+ public void testSerializationFails() {
+ DynamicWorkUnitMessage msg = SplitWorkUnitMessage.builder()
+ .workUnitId("workUnitId")
+ .laggingTopicPartitions(Arrays.asList("topic-1","topic-2"))
+ .build();
+
+ // Serializing without using the DynamicWorkUnitSerde#serialize method should cause a runtime exception
+ // when deserializing
+ Gson gson = new Gson();
+ byte[] serializedMsg = gson.toJson(msg).getBytes(StandardCharsets.UTF_8);
+
+ try {
+ DynamicWorkUnitMessage failsToDeserialize = DynamicWorkUnitSerde.deserialize(serializedMsg);
+ } catch(DynamicWorkUnitDeserializationException e) {
+ log.info("Successfully threw exception when failing to deserialize. exception=" + e);
+ throw e;
+ }
+ }
+}