You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/09/12 16:56:55 UTC

[gobblin] branch master updated: [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e530a78 [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
d0e530a78 is described below

commit d0e530a78ae0417ea6e3ab6aeaeb4e2f260e227f
Author: Matthew Ho <ho...@gmail.com>
AuthorDate: Mon Sep 12 09:56:48 2022 -0700

    [GOBBLIN-1673][GOBBLIN-1683] Skeleton code for handling messages between task runner / application master for Dynamic work unit allocation (#3539)
    
    * [GOBBLIN-1673] Schema for dynamic work unit message
    
    * [GOBBLIN-1683] Dynamic Work Unit messaging abstractions
---
 .../runtime/messaging/DynamicWorkUnitConsumer.java |  65 ++++++++++++
 .../runtime/messaging/DynamicWorkUnitProducer.java |  56 +++++++++++
 .../gobblin/runtime/messaging/MessageBuffer.java   |  82 +++++++++++++++
 .../DynamicWorkUnitDeserializationException.java   |  30 ++++++
 .../messaging/data/DynamicWorkUnitMessage.java     |  45 +++++++++
 .../messaging/data/DynamicWorkUnitSerde.java       | 111 +++++++++++++++++++++
 .../messaging/data/SplitWorkUnitMessage.java       |  39 ++++++++
 .../messaging/handler/SplitMessageHandler.java     |  42 ++++++++
 .../messaging/data/DynamicWorkUnitSerdeTest.java   |  64 ++++++++++++
 9 files changed, 534 insertions(+)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java
new file mode 100644
index 000000000..ffee68b3e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitConsumer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Receives {@link DynamicWorkUnitMessage} sent by {@link DynamicWorkUnitProducer}.
+ * The class is used as a callback for the {@link MessageBuffer}. All business logic
+ * is done in the {@link DynamicWorkUnitMessage.Handler}.<br><br>
+ *
+ * {@link DynamicWorkUnitConsumer#accept(List)} is the entrypoint for processing {@link DynamicWorkUnitMessage}
+ * received by the {@link MessageBuffer} after calling {@link MessageBuffer#subscribe(Consumer)}<br><br>
+ *
+ * Each newly published {@link DynamicWorkUnitMessage} is passed to a {@link DynamicWorkUnitMessage.Handler}
+ * and will call {@link DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)} to do business logic
+ */
+public class DynamicWorkUnitConsumer implements Consumer<List<DynamicWorkUnitMessage>> {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicWorkUnitConsumer.class);
+  private final List<DynamicWorkUnitMessage.Handler> messageHandlers;
+
+  public DynamicWorkUnitConsumer(Collection<DynamicWorkUnitMessage.Handler> handlers) {
+    this.messageHandlers = new ArrayList<>(handlers);
+  }
+
+  /**
+   * Entry point for processing messages sent by {@link DynamicWorkUnitProducer} via {@link MessageBuffer}. This
+   * calls {@link DynamicWorkUnitMessage.Handler#handle(DynamicWorkUnitMessage)} method for each handler added via
+   * {@link DynamicWorkUnitConsumer#DynamicWorkUnitConsumer(Collection<DynamicWorkUnitMessage.Handler>)
+   */
+  @Override
+  public void accept(List<DynamicWorkUnitMessage> messages) {
+    for (DynamicWorkUnitMessage msg : messages) {
+      handleMessage(msg);
+    }
+  }
+
+  private void handleMessage(DynamicWorkUnitMessage msg) {
+    LOG.debug("{} handling message={}", DynamicWorkUnitConsumer.class.getSimpleName(), msg);
+    for (DynamicWorkUnitMessage.Handler handler : this.messageHandlers) {
+      handler.handle(msg);
+    }
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitProducer.java
new file mode 100644
index 000000000..c50e34162
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/DynamicWorkUnitProducer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging;
+
+import java.io.IOException;
+import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Producer runs in each task runner for sending {@link DynamicWorkUnitMessage} over a {@link MessageBuffer}
+ * to be consumed by {@link DynamicWorkUnitConsumer} in the AM<br><br>
+ *
+ * A {@link DynamicWorkUnitProducer} has a tight coupling with the {@link DynamicWorkUnitConsumer}
+ * since both producer / consumer should be using the same {@link MessageBuffer}
+ * from the same {@link MessageBuffer.Factory} and using the same channel name
+ */
+public class DynamicWorkUnitProducer {
+  private static final Logger LOG = LoggerFactory.getLogger(DynamicWorkUnitConsumer.class);
+  private final MessageBuffer<DynamicWorkUnitMessage> messageBuffer;
+
+  public DynamicWorkUnitProducer(MessageBuffer<DynamicWorkUnitMessage> messageBuffer) {
+    this.messageBuffer = messageBuffer;
+  }
+
+  /**
+   * Send a {@link DynamicWorkUnitMessage} to be consumed by a {@link DynamicWorkUnitConsumer}
+   * @param message Message to be sent over the message buffer
+   */
+  public boolean produce(DynamicWorkUnitMessage message) throws IOException {
+    LOG.debug("Sending message over message buffer, messageBuffer={}, message={}",
+        messageBuffer.getClass().getSimpleName(), message);
+    try {
+      messageBuffer.publish(message);
+      return true;
+    } catch (IOException e) {
+      LOG.debug("Failed to publish message. exception=", e);
+      return false;
+    }
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/MessageBuffer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/MessageBuffer.java
new file mode 100644
index 000000000..612986378
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/MessageBuffer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Consumer;
+
+
+/**
+ * Unidirectional buffer for sending messages from container to container.
+ * Example use case is for sending messages from taskrunner -> AM. If messages need to be
+ * sent bi-directionally, use multiple message buffers with different channel names.<br><br>
+ *
+ * {@link MessageBuffer} should only be instantiated using {@link MessageBuffer.Factory#getBuffer(String)} and not via constructor.
+ * A {@link MessageBuffer} will only communicate with other {@link MessageBuffer}(s) created by the same {@link MessageBuffer.Factory}
+ * and the same channel name.
+ *
+ * This interface provides the following guarantees:
+ * <ul>
+ *   <li>No guaranteed order delivery</li>
+ *   <li>Single reader calling subscribe method</li>
+ *   <li>Multiple concurrent writers calling publish</li>
+ *   <li>Messages delivered at most once</li>
+ * </ul>
+ */
+public interface MessageBuffer<T> {
+  /**
+   * Alias for the message buffer. Message buffers will only communicate with other message buffers
+   * using the same channel name and coming from the same {@link MessageBuffer.Factory}
+   *
+   * i.e. When 2 containers use the same factory implementation to create a {@link MessageBuffer} with the same
+   * {@link MessageBuffer#getChannelName()}, they should be able to send messages uni-directionally from
+   * publisher to subscriber.
+   * @return channel name
+   */
+  String getChannelName();
+
+  /**
+   * Publish item to message buffer for consumption by subscribers
+   * @param item item to publish to subscribers
+   * @return Is item successfully added to buffer
+   * @throws IOException if unable to add message to buffer
+   */
+  void publish(T item) throws IOException;
+
+  /**
+   * Add callback {@link Consumer} object that will consume all published {@link T}. It is safe to subscribe multiple
+   * consumer objects but only one container should be calling this method per channel. This is
+   * because the message buffer API supports at-most once delivery and one reader, so reads are destructive
+   */
+  void subscribe(Consumer<List<T>> consumer);
+
+  /**
+   * Factory for instantiating {@link MessageBuffer} with a specific channel name. Message buffers produced by the same
+   * factory and using the same channel name are able to communicate with each other.
+   * @param <T>
+   */
+  interface Factory<T> {
+    /**
+     * Create {@link MessageBuffer} with specific channel name. {@link MessageBuffer}(s) with the same channel name and
+     * from the same factory will exclusively communicate with eachother.
+     * @param channelName channel namespace
+     * @return Message Buffer
+     */
+    MessageBuffer<T> getBuffer(String channelName);
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitDeserializationException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitDeserializationException.java
new file mode 100644
index 000000000..7da4fbf9e
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitDeserializationException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+/**
+ * An exception when {@link DynamicWorkUnitMessage} cannot be correctly deserialized from underlying message storage
+ */
+public class DynamicWorkUnitDeserializationException extends RuntimeException {
+  public DynamicWorkUnitDeserializationException(String message) {
+    super(message);
+  }
+
+  public DynamicWorkUnitDeserializationException(String message, Throwable e) {
+    super(message, e);
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitMessage.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitMessage.java
new file mode 100644
index 000000000..731423b12
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import gobblin.source.workunit.WorkUnit;
+
+/**
+ * Generic message for sending updates about a workunit during runtime. Implementations can
+ * extend this interface with other getters / properties to piggyback information specific to the message subtype.
+ *
+ * For example, the {@link SplitWorkUnitMessage} extends this interface by adding fields that are not specified in this
+ * interface.
+ */
+public interface DynamicWorkUnitMessage {
+  /**
+   * The WorkUnit Id this message is associated with. Same as {@link WorkUnit#getId()}
+   * @return WorkUnit Id
+   */
+  String getWorkUnitId();
+
+  /**
+   * Handler for processing messages and implementing business logic
+   */
+  interface Handler {
+    /**
+     * Process this message by handling business logic
+     * @param message
+     */
+    void handle(DynamicWorkUnitMessage message);
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerde.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerde.java
new file mode 100644
index 000000000..f32cccc7d
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerde.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import java.nio.charset.Charset;
+import lombok.NonNull;
+
+/**
+ * Library for serializing / deserializing {@link DynamicWorkUnitMessage}. This solution maintains implementation
+ * specific fields not specified in the interface and is dependent on {@link Gson} to do the serialization / deserialization
+ */
+public final class DynamicWorkUnitSerde {
+  private static final Gson GSON = new Gson();
+  private static final String PROPS_PREFIX = "DynamicWorkUnit.Props";
+  private static final String MESSAGE_IMPLEMENTATION = PROPS_PREFIX + ".MessageImplementationClass";
+  private static final Charset DEFAULT_CHAR_ENCODING = Charsets.UTF_8;
+
+  // Suppresses default constructor, ensuring non-instantiability.
+  private DynamicWorkUnitSerde() { }
+
+  /**
+   * Serialize message into bytes. To deserialize, use {@link DynamicWorkUnitSerde#deserialize(byte[])}.
+   * Serialization preserves underlying properties of the {@link DynamicWorkUnitMessage} implementation.<br><br>
+   * For example, the {@link SplitWorkUnitMessage} implements
+   * {@link DynamicWorkUnitMessage} and has implementation specific properties such as
+   * {@link SplitWorkUnitMessage#getLaggingTopicPartitions()}. These properties will be maintained after serde.
+   * @param msg message to serialize
+   * @return message as bytes
+   */
+  public static byte[] serialize(DynamicWorkUnitMessage msg) {
+    Preconditions.checkNotNull(msg, "Input message cannot be null");
+    return toJsonObject(msg)
+        .toString()
+        .getBytes(DEFAULT_CHAR_ENCODING);
+  }
+
+  /**
+   * Deserialize bytes into message object. Input message byte array should have been serialized using
+   * {@link DynamicWorkUnitSerde#serialize(DynamicWorkUnitMessage)}.
+   * @param serializedMessage message that has been serialized by {@link DynamicWorkUnitSerde#serialize(DynamicWorkUnitMessage)}
+   * @return DynamicWorkUnitMessage object
+   */
+  public static DynamicWorkUnitMessage deserialize(byte[] serializedMessage) {
+    String json = new String(serializedMessage, DEFAULT_CHAR_ENCODING);
+    JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
+    return toDynamicWorkUnitMessage(jsonObject);
+  }
+
+  /**
+   * Helper method for deserializing {@link JsonObject} to {@link DynamicWorkUnitMessage}
+   * @param json Message serialized using {@link DynamicWorkUnitSerde#toJsonObject}
+   * @return {@link DynamicWorkUnitMessage} POJO representation of the given json
+   */
+  private static <T extends DynamicWorkUnitMessage> DynamicWorkUnitMessage toDynamicWorkUnitMessage(JsonObject json) {
+    Preconditions.checkNotNull(json, "Serialized msg cannot be null");
+    try {
+      if (!json.has(MESSAGE_IMPLEMENTATION)) {
+        throw new DynamicWorkUnitDeserializationException(
+            String.format("Unable to deserialize json to %s. Ensure that %s "
+                    + "is used for serialization. %s does not have the key=%s used for deserializing to correct message "
+                    + "implementation. json=%s",
+                DynamicWorkUnitMessage.class.getSimpleName(),
+                "DynamicWorkSerde#serialize(DynamicWorkUnitMessage msg)",
+                json.getClass().getSimpleName(),
+                MESSAGE_IMPLEMENTATION,
+                json));
+      }
+      Class<T> clazz = (Class<T>) Class.forName(json.get(MESSAGE_IMPLEMENTATION).getAsString());
+      return GSON.fromJson(json, clazz);
+    } catch (ClassNotFoundException e) {
+      throw new DynamicWorkUnitDeserializationException(
+          String.format("Input param %s contains invalid value for key=%s. This can be caused by the deserializer having"
+                  + " different dependencies from the serializer. json=%s",
+              json.getClass(),
+              MESSAGE_IMPLEMENTATION,
+              json), e);
+    }
+  }
+
+  /**
+   * Helper method for serializing {@link DynamicWorkUnitMessage} to {@link JsonObject}
+   * @param msg Message to serialize
+   * @return json representation of message
+   */
+  private static JsonObject toJsonObject(@NonNull DynamicWorkUnitMessage msg) {
+    Preconditions.checkNotNull(msg, "Input message cannot be null");
+    JsonElement json = GSON.toJsonTree(msg);
+    JsonObject obj = json.getAsJsonObject();
+    obj.addProperty(MESSAGE_IMPLEMENTATION, msg.getClass().getName());
+    return obj;
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/SplitWorkUnitMessage.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/SplitWorkUnitMessage.java
new file mode 100644
index 000000000..b0cf7e0cc
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/data/SplitWorkUnitMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import java.util.List;
+import lombok.Builder;
+import lombok.Value;
+
+/**
+ * Message for the task runner to request the AM to split a workunit into multiple workunits
+ * because a subset of topic partitions are lagging in the specified workunit.
+ */
+@Value
+@Builder
+public class SplitWorkUnitMessage implements DynamicWorkUnitMessage {
+  /**
+   * Workunit ID of the work unit that should be split into multiple smaller workunits
+   */
+  String workUnitId;
+
+  /**
+   * Topic partitions that have been lagging in the workunit
+   */
+  List<String> laggingTopicPartitions;
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/handler/SplitMessageHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/handler/SplitMessageHandler.java
new file mode 100644
index 000000000..d917b0a7f
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/messaging/handler/SplitMessageHandler.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.handler;
+
+import org.apache.gobblin.runtime.messaging.data.DynamicWorkUnitMessage;
+import org.apache.gobblin.runtime.messaging.data.SplitWorkUnitMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handler that runs on the AM for processing {@link SplitWorkUnitMessage} and re-computing
+ * workunit to multiple smaller work units.
+ */
+public class SplitMessageHandler implements DynamicWorkUnitMessage.Handler {
+  private static final Logger LOG = LoggerFactory.getLogger(SplitMessageHandler.class);
+
+  @Override
+  public void handle(DynamicWorkUnitMessage message) {
+    if (message instanceof SplitWorkUnitMessage) {
+      handleSplit((SplitWorkUnitMessage) message);
+    }
+  }
+
+  private void handleSplit(SplitWorkUnitMessage message) {
+    //TODO: GOBBLIN-1688 Recompute workunit based on message.
+    LOG.info("Handling {}, message={}", SplitWorkUnitMessage.class.getSimpleName(), message);
+  }
+}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerdeTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerdeTest.java
new file mode 100644
index 000000000..d9d60071c
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/messaging/data/DynamicWorkUnitSerdeTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime.messaging.data;
+
+import com.google.gson.Gson;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import lombok.extern.java.Log;
+import org.testng.annotations.Test;
+
+import static org.testng.AssertJUnit.*;
+
+@Log
+@Test
+public class DynamicWorkUnitSerdeTest {
+  @Test
+  public void testSerialization() {
+    DynamicWorkUnitMessage msg = SplitWorkUnitMessage.builder()
+        .workUnitId("workUnitId")
+        .laggingTopicPartitions(Arrays.asList("topic-1","topic-2"))
+        .build();
+
+    byte[] serializedMsg = DynamicWorkUnitSerde.serialize(msg);
+
+    DynamicWorkUnitMessage deserializedMsg = DynamicWorkUnitSerde.deserialize(serializedMsg);
+
+    assertTrue(deserializedMsg instanceof SplitWorkUnitMessage);
+    assertEquals(msg, deserializedMsg);
+  }
+
+  @Test(expectedExceptions = DynamicWorkUnitDeserializationException.class)
+  public void testSerializationFails() {
+    DynamicWorkUnitMessage msg = SplitWorkUnitMessage.builder()
+        .workUnitId("workUnitId")
+        .laggingTopicPartitions(Arrays.asList("topic-1","topic-2"))
+        .build();
+
+    // Serializing without using the DynamicWorkUnitSerde#serialize method should cause a runtime exception
+    // when deserializing
+    Gson gson = new Gson();
+    byte[] serializedMsg = gson.toJson(msg).getBytes(StandardCharsets.UTF_8);
+
+    try {
+      DynamicWorkUnitMessage failsToDeserialize = DynamicWorkUnitSerde.deserialize(serializedMsg);
+    } catch(DynamicWorkUnitDeserializationException e) {
+      log.info("Successfully threw exception when failing to deserialize. exception=" + e);
+      throw e;
+    }
+  }
+}