You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/05 18:03:49 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-673] Implement
a FS based JobStatusRetriever for GaaS Flows.
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9040c7a [GOBBLIN-673] Implement a FS based JobStatusRetriever for GaaS Flows.
9040c7a is described below
commit 9040c7a7e8215fbdd3f3840a0ca39dd4eea4ee4f
Author: suvasude <su...@linkedin.biz>
AuthorDate: Tue Feb 5 10:03:42 2019 -0800
[GOBBLIN-673] Implement a FS based JobStatusRetriever for GaaS Flows.
Closes #2545 from sv2000/kafkaTracking
---
.../metastore/FileContextBasedFsStateStore.java | 83 +++++++
.../FileContextBasedFsStateStoreFactory.java | 56 +++++
.../org/apache/gobblin/metastore/FsStateStore.java | 25 +-
.../gobblin/metastore/util/StateStoreCleaner.java | 3 +-
.../metastore/util/StateStoreCleanerRunnable.java | 58 +++++
.../apache/gobblin/metrics/event/TimingEvent.java | 3 +
gobblin-modules/gobblin-kafka-08/build.gradle | 10 +
.../gobblin/runtime/kafka/HighLevelConsumer.java | 19 +-
.../service/monitoring/JobStatusRetriever.java | 3 +
gobblin-service/build.gradle | 3 +
.../modules/core/GobblinServiceManager.java | 1 -
.../service/modules/orchestration/DagManager.java | 125 +++++++---
.../service/monitoring/FsJobStatusRetriever.java | 170 +++++++++++++
.../monitoring/KafkaAvroJobStatusMonitor.java | 160 ++++++++++++
.../service/monitoring/KafkaJobStatusMonitor.java | 137 +++++++++++
.../monitoring/KafkaJobStatusMonitorFactory.java | 67 ++++++
.../monitoring/FsJobStatusRetrieverTest.java | 169 +++++++++++++
.../monitoring/KafkaAvroJobStatusMonitorTest.java | 267 +++++++++++++++++++++
18 files changed, 1308 insertions(+), 51 deletions(-)
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
new file mode 100644
index 0000000..ade7a50
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStore.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.HadoopUtils;
+
+
+/**
+ * An implementation of {@link StateStore} backed by a {@link FileSystem}.
+ *
+ * <p>
+ * This implementation extends {@link FsStateStore} to use
+ * {@link org.apache.hadoop.fs.FileContext} APIs to persist state to the state store.
+ * The advantage of using {@link org.apache.hadoop.fs.FileContext} is that it provides an
+ * atomic rename-with-overwrite option which allows atomic update to a previously written
+ * state file.
+ * </p>
+ *
+ * @param <T> state object type
+ *
+ * @author Sudarshan Vasudevan
+ */
+
+public class FileContextBasedFsStateStore<T extends State> extends FsStateStore<T> {
+ private FileContext fc;
+
+ public FileContextBasedFsStateStore(String fsUri, String storeRootDir, Class stateClass)
+ throws IOException {
+ super(fsUri, storeRootDir, stateClass);
+ this.fc = FileContext.getFileContext(URI.create(fsUri));
+ }
+
+ public FileContextBasedFsStateStore(FileSystem fs, String storeRootDir, Class<T> stateClass)
+ throws UnsupportedFileSystemException {
+ super(fs, storeRootDir, stateClass);
+ this.fc = FileContext.getFileContext(this.fs.getUri());
+ }
+
+ public FileContextBasedFsStateStore(String storeUrl, Class<T> stateClass)
+ throws IOException {
+ super(storeUrl, stateClass);
+ this.fc = FileContext.getFileContext(this.fs.getUri());
+ }
+
+ /**
+ * See {@link StateStore#put(String, String, T)}.
+ *
+ * <p>
+ * This implementation uses {@link FileContext#rename(Path, Path, Options.Rename...)}, with
+ * {@link org.apache.hadoop.fs.Options.Rename#OVERWRITE} set to true, to write the
+ * state to the underlying state store.
+ * </p>
+ */
+ @Override
+ protected void renamePath(Path tmpTablePath, Path tablePath) throws IOException {
+ HadoopUtils.renamePath(this.fc, tmpTablePath, tablePath, true);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStoreFactory.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStoreFactory.java
new file mode 100644
index 0000000..9c68cc9
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FileContextBasedFsStateStoreFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class FileContextBasedFsStateStoreFactory implements StateStore.Factory {
+ @Override
+ public <T extends State> StateStore<T> createStateStore(Config config, Class<T> stateClass) {
+ // Add all job configuration properties so they are picked up by Hadoop
+ Configuration conf = new Configuration();
+ for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue().unwrapped().toString());
+ }
+
+ try {
+ String stateStoreFsUri = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FS_URI_KEY,
+ ConfigurationKeys.LOCAL_FS_URI);
+ FileSystem stateStoreFs = FileSystem.get(URI.create(stateStoreFsUri), conf);
+ String stateStoreRootDir = config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY);
+
+ return new FileContextBasedFsStateStore<T>(stateStoreFs, stateStoreRootDir, stateClass);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create FsStateStore with factory", e);
+ }
+
+ }
+}
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
index 4dd100d..34c0b8f 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
@@ -17,14 +17,11 @@
package org.apache.gobblin.metastore;
-import static org.apache.gobblin.util.HadoopUtils.FS_SCHEMES_NON_ATOMIC;
-
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
-import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -33,14 +30,18 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.WritableShimSerialization;
+import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
-import com.google.common.base.Predicate;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.WritableShimSerialization;
+import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
+
+import static org.apache.gobblin.util.HadoopUtils.FS_SCHEMES_NON_ATOMIC;
@@ -73,7 +74,7 @@ public class FsStateStore<T extends State> implements StateStore<T> {
protected final String storeRootDir;
// Class of the state objects to be put into the store
- private final Class<T> stateClass;
+ protected final Class<T> stateClass;
public FsStateStore(String fsUri, String storeRootDir, Class<T> stateClass) throws IOException {
this.conf = getConf(null);
@@ -174,7 +175,7 @@ public class FsStateStore<T extends State> implements StateStore<T> {
if (this.useTmpFileForPut) {
Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
- HadoopUtils.renamePath(this.fs, tmpTablePath, tablePath);
+ renamePath(tmpTablePath, tablePath);
}
}
@@ -211,10 +212,14 @@ public class FsStateStore<T extends State> implements StateStore<T> {
if (this.useTmpFileForPut) {
Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
- HadoopUtils.renamePath(this.fs, tmpTablePath, tablePath);
+ renamePath(tmpTablePath, tablePath);
}
}
+ protected void renamePath(Path tmpTablePath, Path tablePath) throws IOException {
+ HadoopUtils.renamePath(this.fs, tmpTablePath, tablePath);
+ }
+
@Override
@SuppressWarnings("unchecked")
public T get(String storeName, String tableName, String stateId) throws IOException {
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java
index c1c3a0b..0cdde49 100644
--- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleaner.java
@@ -137,7 +137,8 @@ public class StateStoreCleaner implements Closeable {
public boolean accept(Path path) {
String fileName = path.getName();
String extension = Files.getFileExtension(fileName);
- return isStateMetaFile(fileName) || extension.equalsIgnoreCase("jst") || extension.equalsIgnoreCase("tst");
+ return isStateMetaFile(fileName) || extension.equalsIgnoreCase("jst") || extension.equalsIgnoreCase("tst") ||
+ (extension.equalsIgnoreCase("gst"));
}
boolean isStateMetaFile(String fileName) {
diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
new file mode 100644
index 0000000..cbc7f2e
--- /dev/null
+++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/util/StateStoreCleanerRunnable.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metastore.util;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A utility class that wraps the {@link StateStoreCleaner} implementation as a {@link Runnable}.
+ */
+@Slf4j
+public class StateStoreCleanerRunnable implements Runnable {
+ private Properties properties;
+
+ public StateStoreCleanerRunnable(Config config) {
+ this.properties = ConfigUtils.configToProperties(config);
+ }
+
+ public void run() {
+ Closer closer = Closer.create();
+ try {
+ log.info("Attempting to clean state store..");
+ closer.register(new StateStoreCleaner(properties)).run();
+ log.info("State store clean up successful.");
+ } catch (IOException | ExecutionException e) {
+ log.error("Exception encountered during execution of {}", StateStoreCleaner.class.getName());
+ } finally {
+ try {
+ closer.close();
+ } catch (IOException e) {
+ log.error("Exception when closing the closer", e);
+ }
+ }
+ }
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 0b3defa..3e0694d 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -68,6 +68,9 @@ public class TimingEvent {
public static final String JOB_GROUP_FIELD = "jobGroup";
public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
+ public static final String LOW_WATERMARK_FIELD = "lowWatermark";
+ public static final String HIGH_WATERMARK_FIELD = "highWatermark";
+ public static final String PROCESSED_COUNT_FIELD = "processedCount";
}
public static final String METADATA_START_TIME = "startTime";
diff --git a/gobblin-modules/gobblin-kafka-08/build.gradle b/gobblin-modules/gobblin-kafka-08/build.gradle
index 3314377..49c44a5 100644
--- a/gobblin-modules/gobblin-kafka-08/build.gradle
+++ b/gobblin-modules/gobblin-kafka-08/build.gradle
@@ -76,6 +76,16 @@ configurations {
// HADOOP-5254 and MAPREDUCE-5664
all*.exclude group: 'xml-apis'
all*.exclude group: 'xerces'
+ tests
+}
+
+task testJar(type: Jar, dependsOn: testClasses) {
+ baseName = "test-${project.archivesBaseName}"
+ from sourceSets.test.output
+}
+
+artifacts {
+ tests testJar
}
test {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index da2ac81..ae21921 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -33,13 +33,6 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.Tag;
-import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
-
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
@@ -50,6 +43,13 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
/**
* A high level consumer for Kafka topics. Subclasses should implement {@link HighLevelConsumer#processMessage(MessageAndMetadata)}
@@ -71,9 +71,9 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
private static final String DEFAULT_GROUP_ID = "KafkaJobSpecMonitor";
@Getter
- private final String topic;
+ protected final String topic;
+ protected final Config config;
private final int numThreads;
- private final Config config;
private final ConsumerConfig consumerConfig;
/**
@@ -203,5 +203,4 @@ public abstract class HighLevelConsumer<K, V> extends AbstractIdleService {
}
}
}
-
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index f64d136..59375b3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -29,6 +29,9 @@ import org.apache.gobblin.annotation.Alpha;
*/
@Alpha
public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker {
+ public static final String EVENT_NAME_FIELD = "eventName";
+ public static final String NA_KEY = "NA";
+ public static final String STATE_STORE_KEY_SEPARATION_CHARACTER = ".";
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
long flowExecutionId);
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 3182080..d7e8ea2 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -71,6 +71,8 @@ dependencies {
compile externalDependency.zkClient
testCompile project(":gobblin-example")
+ testCompile project(path: ":gobblin-modules:gobblin-kafka-08:", configuration: "tests")
+ testCompile project(":gobblin-test-utils")
testCompile externalDependency.byteman
testCompile externalDependency.bytemanBmunit
testCompile externalDependency.calciteCore
@@ -81,6 +83,7 @@ dependencies {
testCompile externalDependency.hamcrest
testCompile externalDependency.jhyde
testCompile externalDependency.mockito
+ testCompile externalDependency.kafka08Test
}
// Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a77833e..3e3429b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -87,7 +87,6 @@ import org.apache.gobblin.service.NoopRequesterService;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f3335bc..04a4c4e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -18,10 +18,12 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
@@ -32,7 +34,6 @@ import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
@@ -55,8 +56,11 @@ import org.apache.gobblin.service.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -89,14 +93,16 @@ import static org.apache.gobblin.service.ExecutionStatus.valueOf;
public class DagManager extends AbstractIdleService {
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
+ private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
+ private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
private static final Integer DEFAULT_JOB_STATUS_POLLING_INTERVAL = 10;
private static final Integer DEFAULT_NUM_THREADS = 3;
private static final Integer TERMINATION_TIMEOUT = 30;
- private static final String DAG_MANAGER_PREFIX = "gobblin.service.dagManager.";
private static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads";
private static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval";
- private static final String JOB_STATUS_RETRIEVER_KEY = DAG_MANAGER_PREFIX + "jobStatusRetriever";
- private static final String DAG_STORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
+ private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
+ private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
static final String DAG_STATESTORE_DIR = DAG_MANAGER_PREFIX + "dagStateStoreDir";
@@ -132,7 +138,9 @@ public class DagManager extends AbstractIdleService {
private final Integer numThreads;
private final Integer pollingInterval;
private final JobStatusRetriever jobStatusRetriever;
+ private final KafkaJobStatusMonitor jobStatusMonitor;
private final DagStateStore dagStateStore;
+
private volatile boolean isActive = false;
public DagManager(Config config, boolean instrumentationEnabled) {
@@ -141,12 +149,22 @@ public class DagManager extends AbstractIdleService {
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config, JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.instrumentationEnabled = instrumentationEnabled;
+ boolean jobStatusMonitorEnabled =
+ ConfigUtils.getBoolean(config, KafkaJobStatusMonitor.JOB_STATUS_MONITOR_ENABLED_KEY, true);
try {
- Class jobStatusRetrieverClass = Class.forName(config.getString(JOB_STATUS_RETRIEVER_KEY));
+ Class jobStatusRetrieverClass;
+ if (jobStatusMonitorEnabled) {
+ jobStatusRetrieverClass = Class.forName(DEFAULT_JOB_STATUS_RETRIEVER_CLASS);
+ this.jobStatusMonitor = new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
+ } else {
+ jobStatusRetrieverClass = Class.forName(config.getString(JOB_STATUS_RETRIEVER_CLASS_KEY));
+ this.jobStatusMonitor = null;
+ }
this.jobStatusRetriever =
- (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
- Class dagStateStoreClass = Class.forName(config.getString(DAG_STORE_CLASS_KEY));
+ (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass,
+ ConfigUtils.getConfigOrEmpty(config, JOB_STATUS_RETRIEVER_KEY));
+ Class dagStateStoreClass = Class.forName(config.getString(DAG_STATESTORE_CLASS_KEY));
this.dagStateStore = (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config);
} catch (ReflectiveOperationException e) {
throw new RuntimeException("Exception encountered during DagManager initialization", e);
@@ -162,11 +180,7 @@ public class DagManager extends AbstractIdleService {
*/
@Override
protected void startUp() {
- //On startup, the service creates tasks that are scheduled at a fixed rate.
- for (int i = 0; i < numThreads; i++) {
- this.scheduledExecutorPool.scheduleAtFixedRate(new DagManagerThread(jobStatusRetriever, dagStateStore, queue, instrumentationEnabled), 0, this.pollingInterval,
- TimeUnit.SECONDS);
- }
+ //Do nothing.
}
/**
@@ -190,15 +204,40 @@ public class DagManager extends AbstractIdleService {
* @param active a boolean to indicate if the {@link DagManager} is the leader.
*/
public synchronized void setActive(boolean active) {
+ if (this.isActive == active) {
+ log.info("DagManager already {}, skipping further actions.", (!active) ? "inactive" : "active");
+ return;
+ }
this.isActive = active;
try {
if (this.isActive) {
+ log.info("Scheduling {} DagManager threads", numThreads);
+ //On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
+ for (int i = 0; i < numThreads; i++) {
+ this.scheduledExecutorPool.scheduleAtFixedRate(new DagManagerThread(jobStatusRetriever, dagStateStore, queue, instrumentationEnabled), 0, this.pollingInterval,
+ TimeUnit.SECONDS);
+ }
+ if ((this.jobStatusMonitor != null) && (!this.jobStatusMonitor.isRunning())) {
+ log.info("Starting job status monitor");
+ jobStatusMonitor.startAsync().awaitRunning();
+ }
for (Dag<JobExecutionPlan> dag : dagStateStore.getDags()) {
offer(dag);
}
+ } else { //Mark the DagManager inactive.
+ log.info("Inactivating the DagManager. Shutting down all DagManager threads");
+ this.scheduledExecutorPool.shutdown();
+ try {
+ this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Exception {} encountered when shutting down DagManager threads.", e);
+ }
+ log.info("Shutting down JobStatusMonitor");
+ this.jobStatusMonitor.shutDown();
}
} catch (IOException e) {
- throw new RuntimeException("Exception encountered when activating the new DagManager", e);
+ log.error("Exception encountered when activating the new DagManager", e);
+ throw new RuntimeException(e);
}
}
@@ -299,7 +338,10 @@ public class DagManager extends AbstractIdleService {
}
log.info("Dag {} submitting jobs ready for execution.", dagId);
//Determine the next set of jobs to run and submit them for execution
- submitNext(dagId);
+ Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = submitNext(dagId);
+ for (DagNode dagNode: nextSubmitted.get(dagId)) {
+ addJobState(dagId, dagNode);
+ }
log.info("Dag {} Initialization complete.", dagId);
}
@@ -310,31 +352,49 @@ public class DagManager extends AbstractIdleService {
private void pollJobStatuses()
throws IOException {
this.failedDagIdsFinishRunning.clear();
- for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
+
+ Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = Maps.newHashMap();
+ List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();
+ for (DagNode<JobExecutionPlan> node: this.jobToDag.keySet()) {
long pollStartTime = System.nanoTime();
JobStatus jobStatus = pollJobStatus(node);
Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);
-
- Preconditions.checkNotNull(jobStatus, "Received null job status for a running job " + DagManagerUtils.getJobName(node));
+ if (jobStatus == null) {
+ continue;
+ }
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node);
ExecutionStatus status = valueOf(jobStatus.getEventName());
-
switch (status) {
case COMPLETE:
jobExecutionPlan.setExecutionStatus(COMPLETE);
- onJobFinish(node);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
break;
case FAILED:
case CANCELLED:
jobExecutionPlan.setExecutionStatus(FAILED);
- onJobFinish(node);
+ nextSubmitted.putAll(onJobFinish(node));
+ nodesToCleanUp.add(node);
break;
default:
jobExecutionPlan.setExecutionStatus(RUNNING);
break;
}
}
+
+ for (Map.Entry<String, Set<DagNode<JobExecutionPlan>>> entry: nextSubmitted.entrySet()) {
+ String dagId = entry.getKey();
+ Set<DagNode<JobExecutionPlan>> dagNodes = entry.getValue();
+ for (DagNode dagNode: dagNodes) {
+ addJobState(dagId, dagNode);
+ }
+ }
+
+ for (DagNode<JobExecutionPlan> dagNode: nodesToCleanUp) {
+ String dagId = DagManagerUtils.generateDagId(this.jobToDag.get(dagNode));
+ deleteJobState(dagId, dagNode);
+ }
}
/**
@@ -357,16 +417,18 @@ public class DagManager extends AbstractIdleService {
}
}
- void submitNext(String dagId) throws IOException {
+ Map<String, Set<DagNode<JobExecutionPlan>>> submitNext(String dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
Set<DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);
//Submit jobs from the dag ready for execution.
for (DagNode<JobExecutionPlan> dagNode : nextNodes) {
submitJob(dagNode);
- addJobState(dagId, dagNode);
}
//Checkpoint the dag state
this.dagStateStore.writeCheckpoint(dag);
+ Map<String, Set<DagNode<JobExecutionPlan>>> dagIdToNextJobs = Maps.newHashMap();
+ dagIdToNextJobs.put(dagId, nextNodes);
+ return dagIdToNextJobs;
}
/**
@@ -418,7 +480,7 @@ public class DagManager extends AbstractIdleService {
* Method that defines the actions to be performed when a job finishes either successfully or with failure.
* This method updates the state of the dag and performs clean up actions as necessary.
*/
- private void onJobFinish(DagNode<JobExecutionPlan> dagNode)
+ private Map<String, Set<DagNode<JobExecutionPlan>>> onJobFinish(DagNode<JobExecutionPlan> dagNode)
throws IOException {
Dag<JobExecutionPlan> dag = this.jobToDag.get(dagNode);
String dagId = DagManagerUtils.generateDagId(dag);
@@ -426,10 +488,8 @@ public class DagManager extends AbstractIdleService {
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
- deleteJobState(dagId, dagNode);
-
if (jobStatus == COMPLETE) {
- submitNext(dagId);
+ return submitNext(dagId);
} else if (jobStatus == FAILED) {
if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
this.failedDagIdsFinishRunning.add(dagId);
@@ -437,6 +497,7 @@ public class DagManager extends AbstractIdleService {
this.failedDagIdsFinishAllPossible.add(dagId);
}
}
+ return Maps.newHashMap();
}
private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
@@ -464,6 +525,7 @@ public class DagManager extends AbstractIdleService {
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
*/
private void cleanUp() {
+ List<String> dagIdstoClean = new ArrayList<>();
//Clean up failed dags
for (String dagId : this.failedDagIdsFinishRunning) {
//Skip monitoring of any other jobs of the failed dag.
@@ -473,7 +535,7 @@ public class DagManager extends AbstractIdleService {
deleteJobState(dagId, dagNode);
}
log.info("Dag {} has finished with status FAILED; Cleaning up dag from the state store.", dagId);
- cleanUpDag(dagId);
+ dagIdstoClean.add(dagId);
}
//Clean up completed dags
@@ -485,16 +547,20 @@ public class DagManager extends AbstractIdleService {
this.failedDagIdsFinishAllPossible.remove(dagId);
}
log.info("Dag {} has finished with status {}; Cleaning up dag from the state store.", dagId, status);
- cleanUpDag(dagId);
+ dagIdstoClean.add(dagId);
}
}
+
+ for (String dagId: dagIdstoClean) {
+ cleanUpDag(dagId);
+ }
}
private void cleanUpDag(String dagId) {
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
this.dagToJobs.remove(dagId);
- this.dags.remove(dagId);
this.dagStateStore.cleanUp(dag);
+ this.dags.remove(dagId);
}
}
@@ -504,5 +570,6 @@ public class DagManager extends AbstractIdleService {
throws Exception {
this.scheduledExecutorPool.shutdown();
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
+ this.jobStatusMonitor.shutDown();
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
new file mode 100644
index 0000000..46ba73b
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metrics.event.TimingEvent;
+
+
+/**
+ * A FileSystem based implementation of {@link JobStatusRetriever}. This implementation stores the job statuses
+ * as {@link org.apache.gobblin.configuration.State} objects in a {@link FsStateStore}.
+ * The store name is set to flowGroup.flowName, while the table name is set to flowExecutionId.jobGroup.jobName.
+ */
+@Slf4j
+public class FsJobStatusRetriever extends JobStatusRetriever {
+ @Getter
+ private final FileContextBasedFsStateStore<State> stateStore;
+
+ public FsJobStatusRetriever(Config config) {
+ this.stateStore = (FileContextBasedFsStateStore<State>) new FileContextBasedFsStateStoreFactory().createStateStore(config, State.class);
+ }
+
+ @Override
+ public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId) {
+ Preconditions.checkArgument(flowName != null, "FlowName cannot be null");
+ Preconditions.checkArgument(flowGroup != null, "FlowGroup cannot be null");
+
+ Predicate<String> flowExecutionIdPredicate = input -> input.startsWith(String.valueOf(flowExecutionId) + ".");
+ String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ try {
+ List<JobStatus> jobStatuses = new ArrayList<>();
+ List<String> tableNames = this.stateStore.getTableNames(storeName, flowExecutionIdPredicate);
+ for (String tableName: tableNames) {
+ List<State> jobStates = this.stateStore.getAll(storeName, tableName);
+ if (jobStates.isEmpty()) {
+ return Iterators.emptyIterator();
+ }
+ if (!shouldFilterJobStatus(tableNames, tableName)) {
+ jobStatuses.add(getJobStatus(jobStates.get(0)));
+ }
+ }
+ return jobStatuses.iterator();
+ } catch (IOException e) {
+ log.error("IOException encountered when retrieving job statuses for flow: {},{},{}", flowGroup, flowName, flowExecutionId, e);
+ return Iterators.emptyIterator();
+ }
+ }
+
+ @Override
+ public Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId,
+ String jobName, String jobGroup) {
+ Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+ Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+ Preconditions.checkArgument(jobName != null, "jobName cannot be null");
+ Preconditions.checkArgument(jobGroup != null, "jobGroup cannot be null");
+
+ try {
+ String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName,
+ KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+ List<State> jobStates = this.stateStore.getAll(storeName, tableName);
+ if (jobStates.isEmpty()) {
+ return Iterators.emptyIterator();
+ } else {
+ return Iterators.singletonIterator(getJobStatus(jobStates.get(0)));
+ }
+ } catch (IOException e) {
+ log.error("Exception encountered when listing files", e);
+ return Iterators.emptyIterator();
+ }
+ }
+
+ /**
+ * @param flowName
+ * @param flowGroup
+ * @return the latest flow execution id with the given flowName and flowGroup. -1 will be returned if no such execution found.
+ */
+ @Override
+ public long getLatestExecutionIdForFlow(String flowName, String flowGroup) {
+ Preconditions.checkArgument(flowName != null, "flowName cannot be null");
+ Preconditions.checkArgument(flowGroup != null, "flowGroup cannot be null");
+ try {
+ String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ List<String> tableNames = this.stateStore.getTableNames(storeName, input -> true);
+ if (tableNames.isEmpty()) {
+ return -1L;
+ }
+ Collections.sort(tableNames);
+ return getExecutionIdFromTableName(tableNames.get(tableNames.size() - 1));
+ } catch (Exception e) {
+ return -1L;
+ }
+ }
+
+ /**
+ *
+ * @param jobState instance of {@link State}
+ * @return deserialize {@link State} into a {@link JobStatus}.
+ */
+ private JobStatus getJobStatus(State jobState) {
+ String flowGroup = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String flowName = jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ long flowExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+ String jobName = jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
+ String jobGroup = jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
+ long jobExecutionId = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
+ String eventName = jobState.getProp(JobStatusRetriever.EVENT_NAME_FIELD);
+ long startTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_START_TIME, "0"));
+ long endTime = Long.parseLong(jobState.getProp(TimingEvent.METADATA_END_TIME, "0"));
+ String message = jobState.getProp(TimingEvent.METADATA_MESSAGE, "");
+ String lowWatermark = jobState.getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD, "");
+ String highWatermark = jobState.getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD, "");
+ long processedCount = Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.PROCESSED_COUNT_FIELD, "0"));
+
+ return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
+ jobName(jobName).jobGroup(jobGroup).jobExecutionId(jobExecutionId).eventName(eventName).
+ lowWatermark(lowWatermark).highWatermark(highWatermark).startTime(startTime).endTime(endTime).
+ message(message).processedCount(processedCount).build();
+ }
+
+ private long getExecutionIdFromTableName(String tableName) {
+ return Long.parseLong(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(0));
+ }
+
+ /**
+ * A helper method to determine if {@link JobStatus}es for jobs without a jobGroup/jobName should be filtered out.
+ * Once a job has been orchestrated, {@link JobStatus}es without a jobGroup/jobName can be filtered out.
+ * @param tableNames
+ * @param tableName
+ * @return
+ */
+ private boolean shouldFilterJobStatus(List<String> tableNames, String tableName) {
+ return tableNames.size() > 1 && JobStatusRetriever.NA_KEY
+ .equals(Splitter.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).splitToList(tableName).get(1));
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
new file mode 100644
index 0000000..a776bd5
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+
+import com.codahale.metrics.Meter;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistryFactory;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaRegistryVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A job status monitor for Avro messages. Uses {@link GobblinTrackingEvent} schema to parse the messages and calls
+ * {@link #parseJobStatus(GobblinTrackingEvent)} for each received message.
+ */
+@Slf4j
+public class KafkaAvroJobStatusMonitor extends KafkaJobStatusMonitor {
+ private static final String JOB_STATUS_MONITOR_MESSAGE_PARSE_FAILURES = "jobStatusMonitor.messageParseFailures";
+
+ private final ThreadLocal<SpecificDatumReader<GobblinTrackingEvent>> reader;
+ private final ThreadLocal<BinaryDecoder> decoder;
+
+ private final SchemaVersionWriter schemaVersionWriter;
+ @Getter
+ private Meter messageParseFailures;
+
+ public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
+ throws IOException, ReflectiveOperationException {
+ super(topic, config, numThreads);
+ if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
+ KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
+ create(ConfigUtils.configToProperties(config));
+ this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, Optional.of(GobblinTrackingEvent.SCHEMA$));
+ } else {
+ this.schemaVersionWriter = new FixedSchemaVersionWriter();
+ }
+ this.decoder = ThreadLocal.withInitial(() -> {
+ InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
+ return DecoderFactory.get().binaryDecoder(dummyInputStream, null);
+ });
+ this.reader = ThreadLocal.withInitial(() -> new SpecificDatumReader<>(GobblinTrackingEvent.SCHEMA$));
+ }
+
+ @Override
+ protected void createMetrics() {
+ super.createMetrics();
+ this.messageParseFailures = this.getMetricContext().meter(JOB_STATUS_MONITOR_MESSAGE_PARSE_FAILURES);
+ }
+
+ @Override
+ public org.apache.gobblin.configuration.State parseJobStatus(byte[] message)
+ throws IOException {
+ InputStream is = new ByteArrayInputStream(message);
+ schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
+
+ Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
+ try {
+ GobblinTrackingEvent decodedMessage = this.reader.get().read(null, decoder);
+ return parseJobStatus(decodedMessage);
+ } catch (AvroRuntimeException | IOException exc) {
+ this.messageParseFailures.mark();
+ if (this.messageParseFailures.getFiveMinuteRate() < 1) {
+ log.warn("Unable to decode input message.", exc);
+ } else {
+ log.warn("Unable to decode input message.");
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Parse the {@link GobblinTrackingEvent}s to determine the {@link ExecutionStatus} of the job.
+ * @param event an instance of {@link GobblinTrackingEvent}
+ * @return job status as an instance of {@link org.apache.gobblin.configuration.State}
+ */
+ private org.apache.gobblin.configuration.State parseJobStatus(GobblinTrackingEvent event) {
+ if (!acceptEvent(event)) {
+ return null;
+ }
+ Properties properties = new Properties();
+ properties.putAll(event.getMetadata());
+
+ switch (event.getName()) {
+ case TimingEvent.FlowTimings.FLOW_COMPILED:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
+ break;
+ case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
+ break;
+ case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.ORCHESTRATED.name());
+ break;
+ case TimingEvent.LauncherTimings.JOB_START:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+ break;
+ case TimingEvent.LauncherTimings.JOB_SUCCEEDED:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPLETE.name());
+ break;
+ case TimingEvent.LauncherTimings.JOB_FAILED:
+ properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.FAILED.name());
+ break;
+ default:
+ return null;
+ }
+ return new org.apache.gobblin.configuration.State(properties);
+ }
+
+
+ /**
+ * Filter for {@link GobblinTrackingEvent}. Used to quickly determine whether an event should be used to produce
+ * a {@link JobStatus}.
+ */
+ private boolean acceptEvent(GobblinTrackingEvent event) {
+ if ((!event.getMetadata().containsKey(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD)) ||
+ (!event.getMetadata().containsKey(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD)) ||
+ (!event.getMetadata().containsKey(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
new file mode 100644
index 0000000..179a107
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import kafka.message.MessageAndMetadata;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.util.StateStoreCleanerRunnable;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A Kafka monitor that tracks {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s reporting statuses of
+ * running jobs. The job statuses are stored as {@link org.apache.gobblin.configuration.State} objects in
+ * a {@link FileContextBasedFsStateStore}.
+ */
+@Slf4j
+public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], byte[]> {
+ static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
+ public static final String JOB_STATUS_MONITOR_ENABLED_KEY = JOB_STATUS_MONITOR_PREFIX + ".enabled";
+ //We use table suffix that is different from the Gobblin job state store suffix of jst to avoid confusion.
+ //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
+ public static final String STATE_STORE_TABLE_SUFFIX = "gst";
+
+ static final String JOB_STATUS_MONITOR_TOPIC_KEY = "topic";
+ static final String JOB_STATUS_MONITOR_NUM_THREADS_KEY = "numThreads";
+ static final String JOB_STATUS_MONITOR_CLASS_KEY = "class";
+ static final String DEFAULT_JOB_STATUS_MONITOR_CLASS = KafkaAvroJobStatusMonitor.class.getName();
+ static final String STATE_STORE_FACTORY_CLASS_KEY = "stateStoreFactoryClass";
+
+ private static final String KAFKA_AUTO_OFFSET_RESET_KEY = "auto.offset.reset";
+ private static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
+ private static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
+
+ @Getter
+ private final StateStore<org.apache.gobblin.configuration.State> stateStore;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
+ KAFKA_AUTO_OFFSET_RESET_KEY, KAFKA_AUTO_OFFSET_RESET_SMALLEST));
+
+ public KafkaJobStatusMonitor(String topic, Config config, int numThreads)
+ throws ReflectiveOperationException {
+ super(topic, config.withFallback(DEFAULTS), numThreads);
+ String stateStoreFactoryClass = ConfigUtils.getString(config, STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName());
+
+ this.stateStore =
+ ((StateStore.Factory) Class.forName(stateStoreFactoryClass).newInstance()).createStateStore(config, org.apache.gobblin.configuration.State.class);
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ }
+
+ @Override
+ protected void startUp() {
+ super.startUp();
+ log.info("Scheduling state store cleaner..");
+ scheduledExecutorService.scheduleAtFixedRate(new StateStoreCleanerRunnable(this.config), 300, 86400L, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void shutDown() {
+ super.shutDown();
+ this.scheduledExecutorService.shutdown();
+ try {
+ this.scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Exception {} encountered when shutting down state store cleaner", e);
+ }
+ }
+
+ @Override
+ protected void createMetrics() {
+ super.createMetrics();
+ }
+
+ @Override
+ protected void processMessage(MessageAndMetadata<byte[],byte[]> message) {
+ try {
+ org.apache.gobblin.configuration.State jobStatus = parseJobStatus(message.message());
+ if (jobStatus != null) {
+ addJobStatusToStateStore(jobStatus);
+ }
+ } catch (IOException ioe) {
+ String messageStr = new String(message.message(), Charsets.UTF_8);
+ log.error(String.format("Failed to parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+ }
+ }
+
+ /**
+ * Persist job status to the underlying {@link StateStore}.
+ * @param jobStatus
+ * @throws IOException
+ */
+ private void addJobStatusToStateStore(org.apache.gobblin.configuration.State jobStatus)
+ throws IOException {
+ String flowName = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
+ String flowGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
+ String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ String flowExecutionId = jobStatus.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ String jobName = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,JobStatusRetriever.NA_KEY);
+ String jobGroup = jobStatus.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
+ String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, STATE_STORE_TABLE_SUFFIX);
+ this.stateStore.put(storeName, tableName, jobStatus);
+ }
+
+ public abstract org.apache.gobblin.configuration.State parseJobStatus(byte[] message) throws IOException;
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
new file mode 100644
index 0000000..6e9b64b
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * A factory implementation that returns a {@link KafkaJobStatusMonitor} instance.
+ */
+@Slf4j
+public class KafkaJobStatusMonitorFactory {
+ private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = "jobStatusMonitor.kafka.config";
+ private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = "metrics.reporting.kafka.config";
+
+ public KafkaJobStatusMonitor createJobStatusMonitor(Config config)
+ throws ReflectiveOperationException {
+ Config jobStatusConfig = config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
+
+ String topic = jobStatusConfig.getString(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_TOPIC_KEY);
+ int numThreads = ConfigUtils.getInt(jobStatusConfig, KafkaJobStatusMonitor.JOB_STATUS_MONITOR_NUM_THREADS_KEY, 5);
+ Class jobStatusMonitorClass = Class.forName(ConfigUtils.getString(jobStatusConfig, KafkaJobStatusMonitor.JOB_STATUS_MONITOR_CLASS_KEY,
+ KafkaJobStatusMonitor.DEFAULT_JOB_STATUS_MONITOR_CLASS));
+
+ Config kafkaSslConfig = ConfigUtils.getConfigOrEmpty(config, KAFKA_SSL_CONFIG_PREFIX_KEY).
+ withFallback(ConfigUtils.getConfigOrEmpty(config, DEFAULT_KAFKA_SSL_CONFIG_PREFIX));
+
+ boolean useSchemaRegistry = ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+ false);
+ Config schemaRegistryConfig = ConfigFactory.empty().withValue(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+ ConfigValueFactory.fromAnyRef(useSchemaRegistry));
+ if (useSchemaRegistry) {
+ //Use KafkaAvroSchemaRegistry
+ schemaRegistryConfig = schemaRegistryConfig
+ .withValue(KafkaAvroSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, config.getValue(KafkaAvroSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL));
+ schemaRegistryConfig = schemaRegistryConfig.withValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE,
+ config.getValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE));
+ }
+ jobStatusConfig = jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
+ return (KafkaJobStatusMonitor) GobblinConstructorUtils.invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads);
+ }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
new file mode 100644
index 0000000..be42982
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class FsJobStatusRetrieverTest {
+ private FsJobStatusRetriever jobStatusRetriever;
+ private FileContextBasedFsStateStore fsStateStore;
+ private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
+
+ private String flowGroup = "myFlowGroup";
+ private String flowName = "myFlowName";
+ private String jobGroup;
+ private String myJobGroup = "myJobGroup";
+ private long jobExecutionId = 1111L;
+ private String message = "https://myServer:8143/1234/1111";
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ cleanUpDir(stateStoreDir);
+ Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir));
+ this.jobStatusRetriever = new FsJobStatusRetriever(config);
+ this.fsStateStore = this.jobStatusRetriever.getStateStore();
+ }
+
+ private void addJobStatusToStateStore(Long flowExecutionId, String jobName) throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+ properties.setProperty(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(flowExecutionId));
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobName);
+ if (!jobName.equals(JobStatusRetriever.NA_KEY)) {
+ this.jobGroup = myJobGroup;
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, myJobGroup);
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, String.valueOf(this.jobExecutionId));
+ properties.setProperty(TimingEvent.METADATA_MESSAGE, this.message);
+ properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.RUNNING.name());
+ } else {
+ this.jobGroup = JobStatusRetriever.NA_KEY;
+ properties.setProperty(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, JobStatusRetriever.NA_KEY);
+ properties.setProperty(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.COMPILED.name());
+ }
+ properties.setProperty(TimingEvent.METADATA_START_TIME, "1");
+ properties.setProperty(TimingEvent.METADATA_END_TIME, "2");
+ State jobStatus = new State(properties);
+
+ String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowGroup, flowName);
+ String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(flowExecutionId, jobGroup, jobName, KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+
+ this.fsStateStore.put(storeName, tableName, jobStatus);
+ }
+
+ @Test
+ public void testGetJobStatusesForFlowExecution() throws IOException {
+ Long flowExecutionId = 1234L;
+ addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY);
+
+ Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ Assert.assertTrue(jobStatusIterator.hasNext());
+ JobStatus jobStatus = jobStatusIterator.next();
+ Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.COMPILED.name());
+ Assert.assertEquals(jobStatus.getJobName(), (JobStatusRetriever.NA_KEY));
+ Assert.assertEquals(jobStatus.getJobGroup(), JobStatusRetriever.NA_KEY);
+ Assert.assertEquals(jobStatus.getProcessedCount(), 0);
+ Assert.assertEquals(jobStatus.getLowWatermark(), "");
+ Assert.assertEquals(jobStatus.getHighWatermark(), "");
+
+ addJobStatusToStateStore(flowExecutionId,"myJobName1");
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ jobStatus = jobStatusIterator.next();
+ Assert.assertEquals(jobStatus.getEventName(), ExecutionStatus.RUNNING.name());
+ Assert.assertEquals(jobStatus.getJobName(), "myJobName1");
+ Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
+ Assert.assertFalse(jobStatusIterator.hasNext());
+
+ addJobStatusToStateStore(flowExecutionId,"myJobName2");
+ jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId);
+ Assert.assertTrue(jobStatusIterator.hasNext());
+ jobStatus = jobStatusIterator.next();
+ Assert.assertTrue(jobStatus.getJobName().equals("myJobName1") || jobStatus.getJobName().equals("myJobName2"));
+
+ String jobName = jobStatus.getJobName();
+ String nextExpectedJobName = ("myJobName1".equals(jobName)) ? "myJobName2" : "myJobName1";
+ Assert.assertTrue(jobStatusIterator.hasNext());
+ jobStatus = jobStatusIterator.next();
+ Assert.assertEquals(jobStatus.getJobName(), nextExpectedJobName);
+ }
+
+ @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
+ public void testGetJobStatusesForFlowExecution1() {
+ long flowExecutionId = 1234L;
+ String jobName = "myJobName1";
+ String jobGroup = "myJobGroup";
+ Iterator<JobStatus> jobStatusIterator = this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
+
+ Assert.assertTrue(jobStatusIterator.hasNext());
+ JobStatus jobStatus = jobStatusIterator.next();
+ Assert.assertEquals(jobStatus.getJobName(), jobName);
+ Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
+ Assert.assertEquals(jobStatus.getJobExecutionId(), jobExecutionId);
+ Assert.assertEquals(jobStatus.getFlowName(), flowName);
+ Assert.assertEquals(jobStatus.getFlowGroup(), flowGroup);
+ Assert.assertEquals(jobStatus.getFlowExecutionId(), flowExecutionId);
+ Assert.assertEquals(jobStatus.getMessage(), message);
+ }
+
+ @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
+ public void testGetLatestExecutionIdForFlow() throws Exception {
+ //Add new flow execution to state store
+ long flowExecutionId = 1235L;
+ addJobStatusToStateStore(flowExecutionId, "myJobName1");
+ long latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+ Assert.assertEquals(latestExecutionIdForFlow, flowExecutionId);
+
+ //Remove all flow executions from state store
+ cleanUpDir(stateStoreDir);
+ latestExecutionIdForFlow = this.jobStatusRetriever.getLatestExecutionIdForFlow(flowName, flowGroup);
+ Assert.assertEquals(latestExecutionIdForFlow, -1L);
+ }
+
+ private void cleanUpDir(String dir) throws Exception {
+ File specStoreDir = new File(dir);
+ if (specStoreDir.exists()) {
+ FileUtils.deleteDirectory(specStoreDir);
+ }
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ cleanUpDir(stateStoreDir);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
new file mode 100644
index 0000000..d34fafd
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitorTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.monitoring;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.kafka.KafkaTestBase;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaKeyValueProducerPusher;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.service.ExecutionStatus;
+
+
+public class KafkaAvroJobStatusMonitorTest {
+ public static final String TOPIC = KafkaAvroJobStatusMonitorTest.class.getSimpleName();
+
+ private KafkaTestBase kafkaTestHelper;
+ private String flowGroup = "myFlowGroup";
+ private String flowName = "myFlowName";
+ private String jobGroup = "myJobGroup";
+ private String jobName = "myJobName";
+ private String flowExecutionId = "1234";
+ private String jobExecutionId = "1111";
+ private String message = "https://myServer:8143/1234/1111";
+ private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ cleanUpDir(stateStoreDir);
+ kafkaTestHelper = new KafkaTestBase();
+ kafkaTestHelper.startServers();
+ kafkaTestHelper.provisionTopic(TOPIC);
+
+ // Create KeyValueProducerPusher instance.
+ Pusher pusher = new KafkaKeyValueProducerPusher<byte[], byte[]>("localhost:dummy", TOPIC,
+ Optional.of(ConfigFactory.parseMap(ImmutableMap.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + this.kafkaTestHelper.getKafkaServerPort()))));
+
+ //Create an event reporter instance
+ MetricContext context = MetricContext.builder("context").build();
+ KafkaAvroEventKeyValueReporter.Builder<?> builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+ builder = builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
+ KafkaEventReporter kafkaReporter = builder.build("localhost:0000", "topic");
+
+ //Submit GobblinTrackingEvents to Kafka
+ GobblinTrackingEvent event1 = createFlowCompiledEvent();
+ context.submitEvent(event1);
+ kafkaReporter.report();
+
+ GobblinTrackingEvent event2 = createJobOrchestratedEvent();
+ context.submitEvent(event2);
+ kafkaReporter.report();
+
+ GobblinTrackingEvent event3 = createJobStartEvent();
+ context.submitEvent(event3);
+ kafkaReporter.report();
+
+ GobblinTrackingEvent event4 = createJobSucceededEvent();
+ context.submitEvent(event4);
+ kafkaReporter.report();
+
+ GobblinTrackingEvent event5 = createDummyEvent();
+ context.submitEvent(event5);
+ kafkaReporter.report();
+
+ try {
+ Thread.sleep(1000);
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Test
+ public void testProcessMessage() throws IOException, ReflectiveOperationException {
+ Config config = ConfigFactory.empty().withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(stateStoreDir))
+ .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
+ KafkaJobStatusMonitor jobStatusMonitor = new KafkaAvroJobStatusMonitor("test",config, 1);
+
+ ConsumerIterator<byte[], byte[]> iterator = this.kafkaTestHelper.getIteratorForTopic(TOPIC);
+
+ MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ StateStore stateStore = jobStatusMonitor.getStateStore();
+ String storeName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowGroup, this.flowName);
+ String tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, "NA", "NA",
+ KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+ List<State> stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ State state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPILED.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ tableName = Joiner.on(JobStatusRetriever.STATE_STORE_KEY_SEPARATION_CHARACTER).join(this.flowExecutionId, this.jobGroup, this.jobName,
+ KafkaJobStatusMonitor.STATE_STORE_TABLE_SUFFIX);
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.ORCHESTRATED.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.RUNNING.name());
+
+ messageAndMetadata = iterator.next();
+ jobStatusMonitor.processMessage(messageAndMetadata);
+
+ stateList = stateStore.getAll(storeName, tableName);
+ Assert.assertEquals(stateList.size(), 1);
+ state = stateList.get(0);
+ Assert.assertEquals(state.getProp(JobStatusRetriever.EVENT_NAME_FIELD), ExecutionStatus.COMPLETE.name());
+
+ messageAndMetadata = iterator.next();
+ Assert.assertNull(jobStatusMonitor.parseJobStatus(messageAndMetadata.message()));
+ }
+
+ private GobblinTrackingEvent createFlowCompiledEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.FlowTimings.FLOW_COMPILED;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+ metadata.put(TimingEvent.METADATA_START_TIME, "1");
+ metadata.put(TimingEvent.METADATA_END_TIME, "2");
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+ return event;
+ }
+
+ private GobblinTrackingEvent createJobOrchestratedEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.LauncherTimings.JOB_ORCHESTRATED;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
+ metadata.put(TimingEvent.METADATA_START_TIME, "3");
+ metadata.put(TimingEvent.METADATA_END_TIME, "4");
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+ return event;
+ }
+
+ private GobblinTrackingEvent createJobStartEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.LauncherTimings.JOB_START;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
+ metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+ metadata.put(TimingEvent.METADATA_START_TIME, "5");
+ metadata.put(TimingEvent.METADATA_END_TIME, "6");
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+ return event;
+ }
+
+ private GobblinTrackingEvent createJobSucceededEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = TimingEvent.LauncherTimings.JOB_SUCCEEDED;
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
+ metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
+ metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
+ metadata.put(TimingEvent.METADATA_MESSAGE, this.message);
+ metadata.put(TimingEvent.METADATA_START_TIME, "7");
+ metadata.put(TimingEvent.METADATA_END_TIME, "8");
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+ return event;
+ }
+
+ /**
+ * Create a dummy event to test if it is filtered out by the consumer.
+ */
+ private GobblinTrackingEvent createDummyEvent() {
+ String namespace = "org.apache.gobblin.metrics";
+ Long timestamp = System.currentTimeMillis();
+ String name = "dummy";
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("k1", "v1");
+ metadata.put("k2", "v2");
+
+ GobblinTrackingEvent event = new GobblinTrackingEvent(timestamp, namespace, name, metadata);
+ return event;
+ }
+
+ private void cleanUpDir(String dir) throws Exception {
+ File specStoreDir = new File(dir);
+ if (specStoreDir.exists()) {
+ FileUtils.deleteDirectory(specStoreDir);
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ try {
+ this.kafkaTestHelper.close();
+ cleanUpDir(stateStoreDir);
+ } catch(Exception e) {
+ System.err.println("Failed to close Kafka server.");
+ }
+ }
+}
\ No newline at end of file