You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/05 18:03:49 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-673] Implement a FS based JobStatusRetriever for GaaS Flows.

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9040c7a  [GOBBLIN-673] Implement a FS based JobStatusRetriever for GaaS Flows.
9040c7a is described below

commit 9040c7a7e8215fbdd3f3840a0ca39dd4eea4ee4f
Author: suvasude <su...@linkedin.biz>
AuthorDate: Tue Feb 5 10:03:42 2019 -0800

    [GOBBLIN-673] Implement a FS based JobStatusRetriever for GaaS Flows.
    
    Closes #2545 from sv2000/kafkaTracking
---
 .../metastore/FileContextBasedFsStateStore.java    |  83 +++++++
 .../FileContextBasedFsStateStoreFactory.java       |  56 +++++
 .../org/apache/gobblin/metastore/FsStateStore.java |  25 +-
 .../gobblin/metastore/util/StateStoreCleaner.java  |   3 +-
 .../metastore/util/StateStoreCleanerRunnable.java  |  58 +++++
 .../apache/gobblin/metrics/event/TimingEvent.java  |   3 +
 gobblin-modules/gobblin-kafka-08/build.gradle      |  10 +
 .../gobblin/runtime/kafka/HighLevelConsumer.java   |  19 +-
 .../service/monitoring/JobStatusRetriever.java     |   3 +
 gobblin-service/build.gradle                       |   3 +
 .../modules/core/GobblinServiceManager.java        |   1 -
 .../service/modules/orchestration/DagManager.java  | 125 +++++++---
 .../service/monitoring/FsJobStatusRetriever.java   | 170 +++++++++++++
 .../monitoring/KafkaAvroJobStatusMonitor.java      | 160 ++++++++++++
 .../service/monitoring/KafkaJobStatusMonitor.java  | 137 +++++++++++
 .../monitoring/KafkaJobStatusMonitorFactory.java   |  67 ++++++
 .../monitoring/FsJobStatusRetrieverTest.java       | 169 +++++++++++++
 .../monitoring/KafkaAvroJobStatusMonitorTest.java  | 267 +++++++++++++++++++++
 18 files changed, 1308 insertions(+), 51 deletions(-)

diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
new file mode 100644
index 0000000..ade7a50
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.HadoopUtils;
+
+
+/**
+ * An implementation of {@link StateStore} backed by a {@link FileSystem}.
+ *
+ * <p>
+ *     This implementation extends {@link FsStateStore} to use
+ *     {@link org.apache.hadoop.fs.FileContext} APIs to persist state to the state store.
+ *     The advantage of using {@link org.apache.hadoop.fs.FileContext} is that it provides an
+ *     atomic rename-with-overwrite option which allows atomic update to a previously written
+ *     state file.
+ * </p>
+ *
+ * @param <T> state object type
+ *
+ * @author Sudarshan Vasudevan
+ */
+
+public class FileContextBasedFsStateStore<T extends State> extends FsStateStore<T> {
+  private FileContext fc;
+
+  public FileContextBasedFsStateStore(String fsUri, String storeRootDir, Class stateClass)
+      throws IOException {
+    super(fsUri, storeRootDir, stateClass);
+    this.fc = FileContext.getFileContext(URI.create(fsUri));
+  }
+
+  public FileContextBasedFsStateStore(FileSystem fs, String storeRootDir, Class<T> stateClass)
+      throws UnsupportedFileSystemException {
+    super(fs, storeRootDir, stateClass);
+    this.fc = FileContext.getFileContext(this.fs.getUri());
+  }
+
+  public FileContextBasedFsStateStore(String storeUrl, Class<T> stateClass)
+      throws IOException {
+    super(storeUrl, stateClass);
+    this.fc = FileContext.getFileContext(this.fs.getUri());
+  }
+
+  /**
+   * See {@link StateStore#put(String, String, T)}.
+   *
+   * <p>
+   *   This implementation uses {@link FileContext#rename(Path, Path, Options.Rename...)}, with
+   *   {@link org.apache.hadoop.fs.Options.Rename#OVERWRITE} set to true, to write the
+   *   state to the underlying state store.
+   * </p>
+   */
+  @Override
+  protected void renamePath(Path tmpTablePath, Path tablePath) throws IOException {
+    HadoopUtils.renamePath(this.fc, tmpTablePath, tablePath, true);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStoreFactory.java
new file mode 100644
index 0000000..9c68cc9
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStoreFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class FileContextBasedFsStateStoreFactory implements StateStore.Factory {
+  @Override
+  public <T extends State> StateStore<T> createStateStore(Config config, Class<T> stateClass) {
+    // Add all job configuration properties so they are picked up by Hadoop
+    Configuration conf = new Configuration();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue().unwrapped().toString());
+    }
+
+    try {
+      String stateStoreFsUri = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FS_URI_KEY,
+          ConfigurationKeys.LOCAL_FS_URI);
+      FileSystem stateStoreFs = FileSystem.get(URI.create(stateStoreFsUri), conf);
+      String stateStoreRootDir = config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY);
+
+      return new FileContextBasedFsStateStore<T>(stateStoreFs, stateStoreRootDir, stateClass);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to create FsStateStore with factory", e);
+    }
+
+  }
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
index 4dd100d..34c0b8f 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
@@ -17,14 +17,11 @@
 
 package org.apache.gobblin.metastore;
 
-import static org.apache.gobblin.util.HadoopUtils.FS_SCHEMES_NON_ATOMIC;
-
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,14 +30,18 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.WritableShimSerialization;
 
+import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
-import com.google.common.base.Predicate;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.WritableShimSerialization;
+import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
+
+import static org.apache.gobblin.util.HadoopUtils.FS_SCHEMES_NON_ATOMIC;
 
 
 
@@ -73,7 +74,7 @@ public class FsStateStore<T extends State> implements StateStore<T> {
   protected final String storeRootDir;
 
   // Class of the state objects to be put into the store
-  private final Class<T> stateClass;
+  protected final Class<T> stateClass;
 
   public FsStateStore(String fsUri, String storeRootDir, Class<T> stateClass) throws IOException {
     this.conf = getConf(null);
@@ -174,7 +175,7 @@ public class FsStateStore<T extends State> implements StateStore<T> {
 
     if (this.useTmpFileForPut) {
       Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
-      HadoopUtils.renamePath(this.fs, tmpTablePath, tablePath);
+      renamePath(tmpTablePath, tablePath);
     }
   }
 
@@ -211,10 +212,14 @@ public class FsStateStore<T extends State> implements StateStore<T> {
 
     if (this.useTmpFileForPut) {
       Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
-      HadoopUtils.renamePath(this.fs, tmpTablePath, tablePath);
+      renamePath(tmpTablePath, tablePath);
     }
   }
 
+  protected void renamePath(Path tmpTablePath, Path tablePath) throws IOException {
+    HadoopUtils.renamePath(this.fs, tmpTablePath, tablePath);
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public T get(String storeName, String tableName, String stateId) throws IOException {
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java
index c1c3a0b..0cdde49 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java
@@ -137,7 +137,8 @@ public class StateStoreCleaner implements Closeable {
     public boolean accept(Path path) {
       String fileName = path.getName();
       String extension = Files.getFileExtension(fileName);
-      return isStateMetaFile(fileName) || extension.equalsIgnoreCase("jst") || extension.equalsIgnoreCase("tst");
+      return isStateMetaFile(fileName) || extension.equalsIgnoreCase("jst") || extension.equalsIgnoreCase("tst") ||
+          (extension.equalsIgnoreCase("gst"));
     }
 
     boolean isStateMetaFile(String fileName) {
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
new file mode 100644
index 0000000..cbc7f2e
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metastore.util;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A utility class that wraps the {@link StateStoreCleaner} implementation as a {@link Runnable}.
+ */
+@Slf4j
+public class StateStoreCleanerRunnable implements Runnable {
+  private Properties properties;
+
+  public StateStoreCleanerRunnable(Config config) {
+    this.properties = ConfigUtils.configToProperties(config);
+  }
+
+  public void run() {
+    Closer closer = Closer.create();
+    try {
+      log.info("Attempting to clean state store..");
+      closer.register(new StateStoreCleaner(properties)).run();
+      log.info("State store clean up successful.");
+    } catch (IOException | ExecutionException e) {
+      log.error("Exception encountered during execution of {}", StateStoreCleaner.class.getName());
+    } finally {
+      try {
+        closer.close();
+      } catch (IOException e) {
+        log.error("Exception when closing the closer", e);
+      }
+    }
+  }
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 0b3defa..3e0694d 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -68,6 +68,9 @@ public class TimingEvent {
     public static final String JOB_GROUP_FIELD = "jobGroup";
     public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
     public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
+    public static final String LOW_WATERMARK_FIELD = "lowWatermark";
+    public static final String HIGH_WATERMARK_FIELD = "highWatermark";
+    public static final String PROCESSED_COUNT_FIELD = "processedCount";
   }
 
   public static final String METADATA_START_TIME = "startTime";
diff --git a/gobblin-modules/gobblin-kafka-08/build.gradle b/gobblin-modules/gobblin-kafka-08/build.gradle
index 3314377..49c44a5 100644
--- a/gobblin-modules/gobblin-kafka-08/build.gradle
+++ b/gobblin-modules/gobblin-kafka-08/build.gradle
@@ -76,6 +76,16 @@ configurations {
   // HADOOP-5254 and MAPREDUCE-5664
   all*.exclude group: 'xml-apis'
   all*.exclude group: 'xerces'
+  tests
+}
+
+task testJar(type: Jar, dependsOn: testClasses) {
+  baseName = "test-${project.archivesBaseName}"
+  from sourceSets.test.output
+}
+
+artifacts {
+  tests testJar
 }
 
 test {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index da2ac81..ae21921 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -33,13 +33,6 @@ import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
@@ -50,6 +43,13 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
 
 /**
  * A high level consumer for Kafka topics. Subclasses should implement {@link HighLevelConsumer#processMessage(MessageAndMetadata)}
@@ -71,9 +71,9 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
   private static final String DEFAULT_GROUP_ID = "KafkaJobSpecMonitor";
 
   @Getter
-  private final String topic;
+  protected final String topic;
+  protected final Config config;
   private final int numThreads;
-  private final Config config;
   private final ConsumerConfig consumerConfig;
 
   /**
@@ -203,5 +203,4 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
       }
     }
   }
-
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index f64d136..59375b3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -29,6 +29,9 @@ import org.apache.gobblin.annotation.Alpha;
  */
 @Alpha
 public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
+  public static final String EVENT_NAME_FIELD = "eventName";
+  public static final String NA_KEY = "NA";
+  public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
 
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
       long flowExecutionId);
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 3182080..d7e8ea2 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -71,6 +71,8 @@ dependencies {
   compile externalDependency.zkClient
 
   testCompile project(":gobblin-example")
+  testCompile project(path: ":gobblin-modules:gobblin-kafka-08:", configuration: "tests")
+  testCompile project(":gobblin-test-utils")
   testCompile externalDependency.byteman
   testCompile externalDependency.bytemanBmunit
   testCompile externalDependency.calciteCore
@@ -81,6 +83,7 @@ dependencies {
   testCompile externalDependency.hamcrest
   testCompile externalDependency.jhyde
   testCompile externalDependency.mockito
+  testCompile externalDependency.kafka08Test
 }
 
 // Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a77833e..3e3429b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -87,7 +87,6 @@ import org.apache.gobblin.service.NoopRequesterService;
 import org.apache.gobblin.service.RequesterService;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f3335bc..04a4c4e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -18,10 +18,12 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -32,7 +34,6 @@ import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractIdleService;
@@ -55,8 +56,11 @@ import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -89,14 +93,16 @@ import static org.apache.gobblin.service.ExecutionStatus.valueOf;
 public class DagManager extends AbstractIdleService {
   public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
 
+  private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
+  private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
   private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
   private static final Integer DEFAULT_NUM_THREADS = 3;
   private static final Integer TERMINATION_TIMEOUT = 30;
-  private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
   private static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads";
   private static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval";
-  private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
-  private static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
+  private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
+  private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
+  private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
 
   static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + "dagStateStoreDir";
 
@@ -132,7 +138,9 @@ public class DagManager extends AbstractIdleService {
   private final Integer numThreads;
   private final Integer pollingInterval;
   private final JobStatusRetriever jobStatusRetriever;
+  private final KafkaJobStatusMonitor jobStatusMonitor;
   private final DagStateStore dagStateStore;
+
   private volatile boolean isActive = false;
 
   public DagManager(Config config, boolean instrumentationEnabled) {
@@ -141,12 +149,22 @@ public class DagManager extends AbstractIdleService {
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.instrumentationEnabled = instrumentationEnabled;
+    boolean jobStatusMonitorEnabled =
+        ConfigUtils.getBoolean(config, KafkaJobStatusMonitor.JOB_STATUS_MONITOR_ENABLED_KEY, true);
 
     try {
-      Class jobStatusRetrieverClass = Class.forName(config.getString(JOB_STATUS_RETRIEVER_KEY));
+      Class jobStatusRetrieverClass;
+      if (jobStatusMonitorEnabled) {
+        jobStatusRetrieverClass = Class.forName(DEFAULT_JOB_STATUS_RETRIEVER_CLASS);
+        this.jobStatusMonitor = new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
+      } else {
+        jobStatusRetrieverClass = Class.forName(config.getString(JOB_STATUS_RETRIEVER_CLASS_KEY));
+        this.jobStatusMonitor = null;
+      }
       this.jobStatusRetriever =
-          (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
-      Class dagStateStoreClass = Class.forName(config.getString(DAG_STORE_CLASS_KEY));
+          (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
+              ConfigUtils.getConfigOrEmpty(config, JOB_STATUS_RETRIEVER_KEY));
+      Class dagStateStoreClass = Class.forName(config.getString(DAG_STATESTORE_CLASS_KEY));
       this.dagStateStore = (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config);
     } catch (ReflectiveOperationException e) {
       throw new RuntimeException("Exception encountered during DagManager initialization", e);
@@ -162,11 +180,7 @@ public class DagManager extends AbstractIdleService {
    */
   @Override
   protected void startUp() {
-    //On startup, the service creates tasks that are scheduled at a fixed rate.
-    for (int i = 0; i < numThreads; i++) {
-      this.scheduledExecutorPool.scheduleAtFixedRate(new DagManagerThread(jobStatusRetriever, dagStateStore, queue, instrumentationEnabled), 0, this.pollingInterval,
-          TimeUnit.SECONDS);
-    }
+    //Do nothing.
   }
 
   /**
@@ -190,15 +204,40 @@ public class DagManager extends AbstractIdleService {
    * @param active a boolean to indicate if the {@link DagManager} is the leader.
    */
   public synchronized void setActive(boolean active) {
+    if (this.isActive == active) {
+      log.info("DagManager already {}, skipping further actions.", (!active) ? "inactive" : "active");
+      return;
+    }
     this.isActive = active;
     try {
       if (this.isActive) {
+        log.info("Scheduling {} DagManager threads", numThreads);
+        //On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
+        for (int i = 0; i < numThreads; i++) {
+          this.scheduledExecutorPool.scheduleAtFixedRate(new DagManagerThread(jobStatusRetriever, dagStateStore, queue, instrumentationEnabled), 0, this.pollingInterval,
+              TimeUnit.SECONDS);
+        }
+        if ((this.jobStatusMonitor != null) && (!this.jobStatusMonitor.isRunning())) {
+          log.info("Starting job status monitor");
+          jobStatusMonitor.startAsync().awaitRunning();
+        }
         for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
           offer(dag);
         }
+      } else { //Mark the DagManager inactive.
+        log.info("Inactivating the DagManager. Shutting down all DagManager threads");
+        this.scheduledExecutorPool.shutdown();
+        try {
+          this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          log.error("Exception {} encountered when shutting down DagManager threads.", e);
+        }
+        log.info("Shutting down JobStatusMonitor");
+        this.jobStatusMonitor.shutDown();
       }
     } catch (IOException e) {
-      throw new RuntimeException("Exception encountered when activating the new DagManager", e);
+      log.error("Exception encountered when activating the new DagManager", e);
+      throw new RuntimeException(e);
     }
   }
 
@@ -299,7 +338,10 @@ public class DagManager extends AbstractIdleService {
       }
       log.info("Dag {} submitting jobs ready for execution.", dagId);
       //Determine the next set of jobs to run and submit them for execution
-      submitNext(dagId);
+      Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
+      for (DagNode dagNode: nextSubmitted.get(dagId)) {
+        addJobState(dagId, dagNode);
+      }
       log.info("Dag {} Initialization complete.", dagId);
     }
 
@@ -310,31 +352,49 @@ public class DagManager extends AbstractIdleService {
     private void pollJobStatuses()
         throws IOException {
       this.failedDagIdsFinishRunning.clear();
-      for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
+
+      Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = Maps.newHashMap();
+      List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
+      for (DagNode<JobExecutionPlan> node: this.jobToDag.keySet()) {
         long pollStartTime = System.nanoTime();
         JobStatus jobStatus = pollJobStatus(node);
         Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
-
-        Preconditions.checkNotNull(jobStatus, "Received null job status for a running job " + DagManagerUtils.getJobName(node));
+        if (jobStatus == null) {
+          continue;
+        }
         JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node);
 
         ExecutionStatus status = valueOf(jobStatus.getEventName());
-
         switch (status) {
           case COMPLETE:
             jobExecutionPlan.setExecutionStatus(COMPLETE);
-            onJobFinish(node);
+            nextSubmitted.putAll(onJobFinish(node));
+            nodesToCleanUp.add(node);
             break;
           case FAILED:
           case CANCELLED:
             jobExecutionPlan.setExecutionStatus(FAILED);
-            onJobFinish(node);
+            nextSubmitted.putAll(onJobFinish(node));
+            nodesToCleanUp.add(node);
             break;
           default:
             jobExecutionPlan.setExecutionStatus(RUNNING);
             break;
         }
       }
+
+      for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: nextSubmitted.entrySet()) {
+        String dagId = entry.getKey();
+        Set<DagNode<JobExecutionPlan>> dagNodes = entry.getValue();
+        for (DagNode dagNode: dagNodes) {
+          addJobState(dagId, dagNode);
+        }
+      }
+
+      for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
+        String dagId = DagManagerUtils.generateDagId(this.jobToDag.get(dagNode));
+        deleteJobState(dagId, dagNode);
+      }
     }
 
     /**
@@ -357,16 +417,18 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    void submitNext(String dagId) throws IOException {
+    Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
       Dag<JobExecutionPlan> dag = this.dags.get(dagId);
       Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
       //Submit jobs from the dag ready for execution.
       for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
         submitJob(dagNode);
-        addJobState(dagId, dagNode);
       }
       //Checkpoint the dag state
       this.dagStateStore.writeCheckpoint(dag);
+      Map<String, Set<DagNode<JobExecutionPlan>>> dagIdToNextJobs = Maps.newHashMap();
+      dagIdToNextJobs.put(dagId, nextNodes);
+      return dagIdToNextJobs;
     }
 
     /**
@@ -418,7 +480,7 @@ public class DagManager extends AbstractIdleService {
      * Method that defines the actions to be performed when a job finishes either successfully or with failure.
      * This method updates the state of the dag and performs clean up actions as necessary.
      */
-    private void onJobFinish(DagNode<JobExecutionPlan> dagNode)
+    private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
         throws IOException {
       Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
       String dagId = DagManagerUtils.generateDagId(dag);
@@ -426,10 +488,8 @@ public class DagManager extends AbstractIdleService {
       ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
       log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
 
-      deleteJobState(dagId, dagNode);
-
       if (jobStatus == COMPLETE) {
-        submitNext(dagId);
+        return submitNext(dagId);
       } else if (jobStatus == FAILED) {
         if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
           this.failedDagIdsFinishRunning.add(dagId);
@@ -437,6 +497,7 @@ public class DagManager extends AbstractIdleService {
           this.failedDagIdsFinishAllPossible.add(dagId);
         }
       }
+      return Maps.newHashMap();
     }
 
     private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
@@ -464,6 +525,7 @@ public class DagManager extends AbstractIdleService {
      * Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
      */
     private void cleanUp() {
+      List<String> dagIdstoClean = new ArrayList<>();
       //Clean up failed dags
       for (String dagId : this.failedDagIdsFinishRunning) {
         //Skip monitoring of any other jobs of the failed dag.
@@ -473,7 +535,7 @@ public class DagManager extends AbstractIdleService {
           deleteJobState(dagId, dagNode);
         }
         log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
-        cleanUpDag(dagId);
+        dagIdstoClean.add(dagId);
       }
 
       //Clean up completed dags
@@ -485,16 +547,20 @@ public class DagManager extends AbstractIdleService {
             this.failedDagIdsFinishAllPossible.remove(dagId);
           }
           log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
-          cleanUpDag(dagId);
+          dagIdstoClean.add(dagId);
         }
       }
+
+      for (String dagId: dagIdstoClean) {
+        cleanUpDag(dagId);
+      }
     }
 
     private void cleanUpDag(String dagId) {
       Dag<JobExecutionPlan> dag = this.dags.get(dagId);
       this.dagToJobs.remove(dagId);
-      this.dags.remove(dagId);
       this.dagStateStore.cleanUp(dag);
+      this.dags.remove(dagId);
     }
   }
 
@@ -504,5 +570,6 @@ public class DagManager extends AbstractIdleService {
       throws Exception {
     this.scheduledExecutorPool.shutdown();
     this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
+    this.jobStatusMonitor.shutDown();
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
new file mode 100644
index 0000000..46ba73b
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * A FileSystem based implementation of {@link JobStatusRetriever}. This implementation stores the job statuses
+ * as {@link org.apache.gobblin.configuration.State} objects in a {@link FsStateStore}.
+ * The store name is set to flowGroup.flowName, while the table name is set to flowExecutionId.jobGroup.jobName.
+ */
+@Slf4j
+public class FsJobStatusRetriever extends JobStatusRetriever {
+  @Getter
+  private final FileContextBasedFsStateStore<State> stateStore;
+
+  public FsJobStatusRetriever(Config config) {
+    this.stateStore = (FileContextBasedFsStateStore<State>) new FileContextBasedFsStateStoreFactory().createStateStore(config, State.class);
+  }
+
+  @Override
+  public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) {
+    Preconditions.checkArgument(flowName != null, "FlowName cannot be null");
+    Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null");
+
+    Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(String.valueOf(flowExecutionId) + ".");
+    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+    try {
+      List<JobStatus> jobStatuses = new ArrayList<>();
+      List<String> tableNames = this.stateStore.getTableNames(storeName, flowExecutionIdPredicate);
+      for (String tableName: tableNames) {
+        List<State> jobStates = this.stateStore.getAll(storeName, tableName);
+        if (jobStates.isEmpty()) {
+          return Iterators.emptyIterator();
+        }
+        if (!shouldFilterJobStatus(tableNames, tableName)) {
+          jobStatuses.add(getJobStatus(jobStates.get(0)));
+        }
+      }
+      return jobStatuses.iterator();
+    } catch (IOException e) {
+      log.error("IOException encountered when retrieving job statuses for flow: {},{},{}", flowGroup, flowName, flowExecutionId, e);
+      return Iterators.emptyIterator();
+    }
+  }
+
+  @Override
+  public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId,
+      String jobName, String jobGroup) {
+    Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    Preconditions.checkArgument(jobName != null, "jobName cannot be null");
+    Preconditions.checkArgument(jobGroup != null, "jobGroup cannot be null");
+
+    try {
+      String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+      String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName,
+          KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+      List<State> jobStates = this.stateStore.getAll(storeName, tableName);
+      if (jobStates.isEmpty()) {
+        return Iterators.emptyIterator();
+      } else {
+        return Iterators.singletonIterator(getJobStatus(jobStates.get(0)));
+      }
+    } catch (IOException e) {
+      log.error("Exception encountered when listing files", e);
+      return Iterators.emptyIterator();
+    }
+  }
+
+  /**
+   * @param flowName
+   * @param flowGroup
+   * @return the latest flow execution id with the given flowName and flowGroup. -1 will be returned if no such execution found.
+   */
+  @Override
+  public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
+    Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+    Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+    try {
+      String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+      List<String> tableNames = this.stateStore.getTableNames(storeName, input -> true);
+      if (tableNames.isEmpty()) {
+        return -1L;
+      }
+      Collections.sort(tableNames);
+      return getExecutionIdFromTableName(tableNames.get(tableNames.size() - 1));
+    } catch (Exception e) {
+      return -1L;
+    }
+  }
+
+  /**
+   *
+   * @param jobState instance of {@link State}
+   * @return deserialize {@link State} into a {@link JobStatus}.
+   */
+  private JobStatus getJobStatus(State jobState) {
+    String flowGroup = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    String flowName = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+    long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+    String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+    String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+    long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+    String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+    long startTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_START_TIME, "0"));
+    long endTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_END_TIME, "0"));
+    String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
+    String lowWatermark = jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
+    String highWatermark = jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
+    long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
+
+    return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
+        jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
+        lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
+        message(message).processedCount(processedCount).build();
+  }
+
+  private long getExecutionIdFromTableName(String tableName) {
+    return Long.parseLong(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
+  }
+
+  /**
+   * A helper method to determine if {@link JobStatus}es for jobs without a jobGroup/jobName should be filtered out.
+   * Once a job has been orchestrated, {@link JobStatus}es without a jobGroup/jobName can be filtered out.
+   * @param tableNames
+   * @param tableName
+   * @return
+   */
+  private boolean shouldFilterJobStatus(List<String> tableNames, String tableName) {
+    return tableNames.size() > 1 && JobStatusRetriever.NA_KEY
+        .equals(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
new file mode 100644
index 0000000..a776bd5
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+
+import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A job status monitor for Avro messages. Uses {@link GobblinTrackingEvent} schema to parse the messages and calls
+ * {@link #parseJobStatus(GobblinTrackingEvent)} for each received message.
+ */
+@Slf4j
+public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
+  private static final String JOB_STATUS_MONITOR_MESSAGE_PARSE_FAILURES = "jobStatusMonitor.messageParseFailures";
+
+  private final ThreadLocal<SpecificDatumReader<GobblinTrackingEvent>> reader;
+  private final ThreadLocal<BinaryDecoder> decoder;
+
+  private final SchemaVersionWriter schemaVersionWriter;
+  @Getter
+  private Meter messageParseFailures;
+
+  public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
+      throws IOException, ReflectiveOperationException {
+    super(topic, config, numThreads);
+    if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
+      KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
+          create(ConfigUtils.configToProperties(config));
+      this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, Optional.of(GobblinTrackingEvent.SCHEMA$));
+    } else {
+      this.schemaVersionWriter = new FixedSchemaVersionWriter();
+    }
+    this.decoder = ThreadLocal.withInitial(() -> {
+      InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
+      return DecoderFactory.get().binaryDecoder(dummyInputStream, null);
+    });
+    this.reader = ThreadLocal.withInitial(() -> new SpecificDatumReader<>(GobblinTrackingEvent.SCHEMA$));
+  }
+
+  @Override
+  protected void createMetrics() {
+    super.createMetrics();
+    this.messageParseFailures = this.getMetricContext().meter(JOB_STATUS_MONITOR_MESSAGE_PARSE_FAILURES);
+  }
+
+  @Override
+  public org.apache.gobblin.configuration.State parseJobStatus(byte[] message)
+      throws IOException {
+    InputStream is = new ByteArrayInputStream(message);
+    schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+
+    Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
+    try {
+      GobblinTrackingEvent decodedMessage = this.reader.get().read(null, decoder);
+      return parseJobStatus(decodedMessage);
+    } catch (AvroRuntimeException | IOException exc) {
+      this.messageParseFailures.mark();
+      if (this.messageParseFailures.getFiveMinuteRate() < 1) {
+        log.warn("Unable to decode input message.", exc);
+      } else {
+        log.warn("Unable to decode input message.");
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Parse the {@link GobblinTrackingEvent}s to determine the {@link ExecutionStatus} of the job.
+   * @param event an instance of {@link GobblinTrackingEvent}
+   * @return job status as an instance of {@link org.apache.gobblin.configuration.State}
+   */
+  private org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event) {
+    if (!acceptEvent(event)) {
+      return null;
+    }
+    Properties properties = new Properties();
+    properties.putAll(event.getMetadata());
+
+    switch (event.getName()) {
+      case TimingEvent.FlowTimings.FLOW_COMPILED:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
+        break;
+      case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
+        break;
+      case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+        break;
+      case TimingEvent.LauncherTimings.JOB_START:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+        break;
+      case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+        break;
+      case TimingEvent.LauncherTimings.JOB_FAILED:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
+        break;
+      default:
+        return null;
+    }
+    return new org.apache.gobblin.configuration.State(properties);
+  }
+
+
+  /**
+   * Filter for {@link GobblinTrackingEvent}. Used to quickly determine whether an event should be used to produce
+   * a {@link JobStatus}.
+   */
+  private boolean acceptEvent(GobblinTrackingEvent event) {
+    if ((!event.getMetadata().containsKey(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD)) ||
+        (!event.getMetadata().containsKey(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD)) ||
+        (!event.getMetadata().containsKey(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))) {
+      return false;
+    }
+    return true;
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
new file mode 100644
index 0000000..179a107
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import kafka.message.MessageAndMetadata;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A Kafka monitor that tracks {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s reporting statuses of
+ * running jobs. The job statuses are stored as {@link org.apache.gobblin.configuration.State} objects in
+ * a {@link FileContextBasedFsStateStore}.
+ */
+@Slf4j
+public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], byte[]> {
+  static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
+  public static final String JOB_STATUS_MONITOR_ENABLED_KEY =  JOB_STATUS_MONITOR_PREFIX + ".enabled";
+  //We use table suffix that is different from the Gobblin job state store suffix of jst to avoid confusion.
+  //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
+  public static final String STATE_STORE_TABLE_SUFFIX = "gst";
+
+  static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
+  static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
+  static final String JOB_STATUS_MONITOR_CLASS_KEY = "class";
+  static final String DEFAULT_JOB_STATUS_MONITOR_CLASS = KafkaAvroJobStatusMonitor.class.getName();
+  static final String STATE_STORE_FACTORY_CLASS_KEY = "stateStoreFactoryClass";
+
+  private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
+  private static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
+  private static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
+
+  @Getter
+  private final StateStore<org.apache.gobblin.configuration.State> stateStore;
+  private final ScheduledExecutorService scheduledExecutorService;
+  private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
+      KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
+
+  public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
+      throws ReflectiveOperationException {
+    super(topic, config.withFallback(DEFAULTS), numThreads);
+    String stateStoreFactoryClass = ConfigUtils.getString(config, STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
+
+    this.stateStore =
+        ((StateStore.Factory) Class.forName(stateStoreFactoryClass).newInstance()).createStateStore(config, org.apache.gobblin.configuration.State.class);
+    this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
+  }
+
+  @Override
+  protected void startUp() {
+    super.startUp();
+    log.info("Scheduling state store cleaner..");
+    scheduledExecutorService.scheduleAtFixedRate(new StateStoreCleanerRunnable(this.config), 300, 86400L, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void shutDown() {
+     super.shutDown();
+     this.scheduledExecutorService.shutdown();
+     try {
+       this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+     } catch (InterruptedException e) {
+       log.error("Exception {} encountered when shutting down state store cleaner", e);
+     }
+  }
+
+  @Override
+  protected void createMetrics() {
+    super.createMetrics();
+  }
+
+  @Override
+  protected void processMessage(MessageAndMetadata<byte[],byte[]> message) {
+    try {
+      org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.message());
+      if (jobStatus != null) {
+        addJobStatusToStateStore(jobStatus);
+      }
+    } catch (IOException ioe) {
+      String messageStr = new String(message.message(), Charsets.UTF_8);
+      log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+    }
+  }
+
+  /**
+   * Persist job status to the underlying {@link StateStore}.
+   * @param jobStatus
+   * @throws IOException
+   */
+  private void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus)
+      throws IOException {
+    String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+    String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+    String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+    String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,JobStatusRetriever.NA_KEY);
+    String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
+    String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+    this.stateStore.put(storeName, tableName, jobStatus);
+  }
+
+  public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
new file mode 100644
index 0000000..6e9b64b
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * A factory implementation that returns a {@link KafkaJobStatusMonitor} instance.
+ */
+@Slf4j
+public class KafkaJobStatusMonitorFactory {
+  private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = "jobStatusMonitor.kafka.config";
+  private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = "metrics.reporting.kafka.config";
+
+  public KafkaJobStatusMonitor createJobStatusMonitor(Config config)
+      throws ReflectiveOperationException {
+    Config jobStatusConfig = config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
+
+    String topic = jobStatusConfig.getString(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_TOPIC_KEY);
+    int numThreads = ConfigUtils.getInt(jobStatusConfig, KafkaJobStatusMonitor.JOB_STATUS_MONITOR_NUM_THREADS_KEY, 5);
+    Class jobStatusMonitorClass = Class.forName(ConfigUtils.getString(jobStatusConfig, KafkaJobStatusMonitor.JOB_STATUS_MONITOR_CLASS_KEY,
+        KafkaJobStatusMonitor.DEFAULT_JOB_STATUS_MONITOR_CLASS));
+
+    Config kafkaSslConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_SSL_CONFIG_PREFIX_KEY).
+        withFallback(ConfigUtils.getConfigOrEmpty(config, DEFAULT_KAFKA_SSL_CONFIG_PREFIX));
+
+    boolean useSchemaRegistry = ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+        false);
+    Config schemaRegistryConfig = ConfigFactory.empty().withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+        ConfigValueFactory.fromAnyRef(useSchemaRegistry));
+    if (useSchemaRegistry) {
+      //Use KafkaAvroSchemaRegistry
+      schemaRegistryConfig = schemaRegistryConfig
+          .withValue(KafkaAvroSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, config.getValue(KafkaAvroSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL));
+      schemaRegistryConfig = schemaRegistryConfig.withValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE,
+          config.getValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE));
+    }
+    jobStatusConfig = jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
+    return (KafkaJobStatusMonitor) GobblinConstructorUtils.invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads);
+  }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
new file mode 100644
index 0000000..be42982
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class FsJobStatusRetrieverTest {
+  private FsJobStatusRetriever jobStatusRetriever;
+  private FileContextBasedFsStateStore fsStateStore;
+  private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
+
+  private String flowGroup = "myFlowGroup";
+  private String flowName = "myFlowName";
+  private String jobGroup;
+  private String myJobGroup = "myJobGroup";
+  private long jobExecutionId = 1111L;
+  private String message = "https://myServer:8143/1234/1111";
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(stateStoreDir);
+    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir));
+    this.jobStatusRetriever = new FsJobStatusRetriever(config);
+    this.fsStateStore = this.jobStatusRetriever.getStateStore();
+  }
+
+  private void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
+    Properties properties = new Properties();
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+    properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
+    properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+    if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
+      this.jobGroup = myJobGroup;
+      properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, myJobGroup);
+      properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(this.jobExecutionId));
+      properties.setProperty(TimingEvent.METADATA_MESSAGE, this.message);
+      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+    } else {
+      this.jobGroup = JobStatusRetriever.NA_KEY;
+      properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
+      properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
+    }
+    properties.setProperty(TimingEvent.METADATA_START_TIME, "1");
+    properties.setProperty(TimingEvent.METADATA_END_TIME, "2");
+    State jobStatus = new State(properties);
+
+    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+    String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+
+    this.fsStateStore.put(storeName, tableName, jobStatus);
+  }
+
+  @Test
+  public void testGetJobStatusesForFlowExecution() throws IOException {
+    Long flowExecutionId = 1234L;
+    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY);
+
+    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+    Assert.assertTrue(jobStatusIterator.hasNext());
+    JobStatus jobStatus = jobStatusIterator.next();
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
+    Assert.assertEquals(jobStatus.getJobName(), (JobStatusRetriever.NA_KEY));
+    Assert.assertEquals(jobStatus.getJobGroup(), JobStatusRetriever.NA_KEY);
+    Assert.assertEquals(jobStatus.getProcessedCount(), 0);
+    Assert.assertEquals(jobStatus.getLowWatermark(), "");
+    Assert.assertEquals(jobStatus.getHighWatermark(), "");
+
+    addJobStatusToStateStore(flowExecutionId,"myJobName1");
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+    jobStatus = jobStatusIterator.next();
+    Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
+    Assert.assertEquals(jobStatus.getJobName(), "myJobName1");
+    Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
+    Assert.assertFalse(jobStatusIterator.hasNext());
+
+    addJobStatusToStateStore(flowExecutionId,"myJobName2");
+    jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+    Assert.assertTrue(jobStatusIterator.hasNext());
+    jobStatus = jobStatusIterator.next();
+    Assert.assertTrue(jobStatus.getJobName().equals("myJobName1") || jobStatus.getJobName().equals("myJobName2"));
+
+    String jobName = jobStatus.getJobName();
+    String nextExpectedJobName = ("myJobName1".equals(jobName)) ? "myJobName2" : "myJobName1";
+    Assert.assertTrue(jobStatusIterator.hasNext());
+    jobStatus = jobStatusIterator.next();
+    Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+  public void testGetJobStatusesForFlowExecution1() {
+    long flowExecutionId = 1234L;
+    String jobName = "myJobName1";
+    String jobGroup = "myJobGroup";
+    Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
+
+    Assert.assertTrue(jobStatusIterator.hasNext());
+    JobStatus jobStatus = jobStatusIterator.next();
+    Assert.assertEquals(jobStatus.getJobName(), jobName);
+    Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
+    Assert.assertEquals(jobStatus.getJobExecutionId(), jobExecutionId);
+    Assert.assertEquals(jobStatus.getFlowName(), flowName);
+    Assert.assertEquals(jobStatus.getFlowGroup(), flowGroup);
+    Assert.assertEquals(jobStatus.getFlowExecutionId(), flowExecutionId);
+    Assert.assertEquals(jobStatus.getMessage(), message);
+  }
+
+  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
+  public void testGetLatestExecutionIdForFlow() throws Exception {
+    //Add new flow execution to state store
+    long flowExecutionId = 1235L;
+    addJobStatusToStateStore(flowExecutionId, "myJobName1");
+    long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+    Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId);
+
+    //Remove all flow executions from state store
+    cleanUpDir(stateStoreDir);
+    latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+    Assert.assertEquals(latestExecutionIdForFlow, -1L);
+  }
+
+  private void cleanUpDir(String dir) throws Exception {
+    File specStoreDir = new File(dir);
+    if (specStoreDir.exists()) {
+      FileUtils.deleteDirectory(specStoreDir);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    cleanUpDir(stateStoreDir);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
new file mode 100644
index 0000000..d34fafd
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.monitoring;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class KafkaAvroJobStatusMonitorTest {
+  public static final String TOPIC = KafkaAvroJobStatusMonitorTest.class.getSimpleName();
+
+  private KafkaTestBase kafkaTestHelper;
+  private String flowGroup = "myFlowGroup";
+  private String flowName = "myFlowName";
+  private String jobGroup = "myJobGroup";
+  private String jobName = "myJobName";
+  private String flowExecutionId = "1234";
+  private String jobExecutionId = "1111";
+  private String message = "https://myServer:8143/1234/1111";
+  private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(stateStoreDir);
+    kafkaTestHelper = new KafkaTestBase();
+    kafkaTestHelper.startServers();
+    kafkaTestHelper.provisionTopic(TOPIC);
+
+    // Create KeyValueProducerPusher instance.
+    Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
+        Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+    //Create an event reporter instance
+    MetricContext context = MetricContext.builder("context").build();
+    KafkaAvroEventKeyValueReporter.Builder<?> builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+    builder = builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+        TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+    KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic");
+
+    //Submit GobblinTrackingEvents to Kafka
+    GobblinTrackingEvent event1 = createFlowCompiledEvent();
+    context.submitEvent(event1);
+    kafkaReporter.report();
+
+    GobblinTrackingEvent event2 = createJobOrchestratedEvent();
+    context.submitEvent(event2);
+    kafkaReporter.report();
+
+    GobblinTrackingEvent event3 = createJobStartEvent();
+    context.submitEvent(event3);
+    kafkaReporter.report();
+
+    GobblinTrackingEvent event4 = createJobSucceededEvent();
+    context.submitEvent(event4);
+    kafkaReporter.report();
+
+    GobblinTrackingEvent event5 = createDummyEvent();
+    context.submitEvent(event5);
+    kafkaReporter.report();
+
+    try {
+      Thread.sleep(1000);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Test
+  public void testProcessMessage() throws IOException, ReflectiveOperationException {
+    Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+        .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
+    KafkaJobStatusMonitor jobStatusMonitor =  new KafkaAvroJobStatusMonitor("test",config, 1);
+
+    ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+    MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    StateStore stateStore = jobStatusMonitor.getStateStore();
+    String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowGroup, this.flowName);
+    String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, "NA", "NA",
+        KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+    List<State> stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    State state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, this.jobGroup, this.jobName,
+        KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
+
+    messageAndMetadata = iterator.next();
+    jobStatusMonitor.processMessage(messageAndMetadata);
+
+    stateList  = stateStore.getAll(storeName, tableName);
+    Assert.assertEquals(stateList.size(), 1);
+    state = stateList.get(0);
+    Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
+
+    messageAndMetadata = iterator.next();
+    Assert.assertNull(jobStatusMonitor.parseJobStatus(messageAndMetadata.message()));
+  }
+
+  private GobblinTrackingEvent createFlowCompiledEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.FlowTimings.FLOW_COMPILED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+    metadata.put(TimingEvent.METADATA_START_TIME, "1");
+    metadata.put(TimingEvent.METADATA_END_TIME, "2");
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+    return event;
+  }
+
+  private GobblinTrackingEvent createJobOrchestratedEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.LauncherTimings.JOB_ORCHESTRATED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
+    metadata.put(TimingEvent.METADATA_START_TIME, "3");
+    metadata.put(TimingEvent.METADATA_END_TIME, "4");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+    return event;
+  }
+
+  private GobblinTrackingEvent createJobStartEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.LauncherTimings.JOB_START;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
+    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+    metadata.put(TimingEvent.METADATA_START_TIME, "5");
+    metadata.put(TimingEvent.METADATA_END_TIME, "6");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+    return event;
+  }
+
+  private GobblinTrackingEvent createJobSucceededEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = TimingEvent.LauncherTimings.JOB_SUCCEEDED;
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
+    metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
+    metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+    metadata.put(TimingEvent.METADATA_START_TIME, "7");
+    metadata.put(TimingEvent.METADATA_END_TIME, "8");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+    return event;
+  }
+
+  /**
+   *   Create a dummy event to test if it is filtered out by the consumer.
+   */
+  private GobblinTrackingEvent createDummyEvent() {
+    String namespace = "org.apache.gobblin.metrics";
+    Long timestamp = System.currentTimeMillis();
+    String name = "dummy";
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put("k1", "v1");
+    metadata.put("k2", "v2");
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+    return event;
+  }
+
+  private void cleanUpDir(String dir) throws Exception {
+    File specStoreDir = new File(dir);
+    if (specStoreDir.exists()) {
+      FileUtils.deleteDirectory(specStoreDir);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    try {
+      this.kafkaTestHelper.close();
+      cleanUpDir(stateStoreDir);
+    } catch(Exception e) {
+      System.err.println("Failed to close Kafka server.");
+    }
+  }
+}
\ No newline at end of file