You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:46 UTC
[28/50] incubator-gobblin git commit: [GOBBLIN-406] delete job state
store on spec delete request
[GOBBLIN-406] delete job state store on spec delete request
Closes #2281 from arjun4084346/deleteInCluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f8950570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f8950570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f8950570
Branch: refs/heads/0.12.0
Commit: f89505702a3a130a7e73c617b33a285ea228bd8c
Parents: 5c678d9
Author: Arjun <ab...@linkedin.com>
Authored: Fri Feb 9 16:04:10 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Fri Feb 9 16:04:10 2018 -0800
----------------------------------------------------------------------
.../runtime/job_monitor/KafkaJobMonitor.java | 25 ++++++++++++++++-
.../job_monitor/KafkaJobMonitorTest.java | 24 +++++++++++++++--
.../runtime/kafka/HighLevelConsumerTest.java | 7 +++++
.../modules/flow/BaseFlowToJobSpecCompiler.java | 28 +++++++++++++++++++-
.../flow/MultiHopsFlowToJobSpecCompiler.java | 8 ++++--
5 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index ba79305..0bb4f14 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -26,6 +26,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
@@ -50,8 +52,9 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
public static final String KAFKA_AUTO_OFFSET_RESET_KEY = KAFKA_JOB_MONITOR_PREFIX + ".auto.offset.reset";
public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
-
+ private DatasetStateStore datasetStateStore;
private final MutableJobCatalog jobCatalog;
+
@Getter
private Counter newSpecs;
@Getter
@@ -67,6 +70,15 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
public KafkaJobMonitor(String topic, MutableJobCatalog catalog, Config config) {
super(topic, ConfigUtils.getConfigOrEmpty(config, KAFKA_JOB_MONITOR_PREFIX), 1);
this.jobCatalog = catalog;
+ try {
+ if (config.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) &&
+ config.getBoolean(ConfigurationKeys.STATE_STORE_ENABLED) &&
+ config.hasPath(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY)) {
+ this.datasetStateStore = DatasetStateStore.buildDatasetStateStore(config);
+ }
+ } catch (IOException e) {
+ log.warn("DatasetStateStore could not be created.");
+ }
}
@Override
@@ -100,6 +112,17 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
} else if (parsedMessage instanceof Either.Right) {
this.remmovedSpecs.inc();
this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
+
+ // Refer FlowConfigsResources:delete to understand the pattern of flow URI
+ // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
+ String[] uriTokens = ((URI)(((Either.Right) parsedMessage).getRight())).getPath().split("/");
+ if (uriTokens.length == 3) {
+ String jobName = uriTokens[2];
+ // Delete the job state if it is a delete spec request
+ if (this.datasetStateStore != null) {
+ this.datasetStateStore.delete(jobName);
+ }
+ }
}
}
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
index 1dd90b1..57f99a9 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
@@ -19,11 +19,16 @@ package org.apache.gobblin.runtime.job_monitor;
import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
@@ -32,8 +37,11 @@ public class KafkaJobMonitorTest {
@Test
public void test() throws Exception {
- MockedKafkaJobMonitor monitor =
- MockedKafkaJobMonitor.create(HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)));
+ Config config = HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX));
+ String stateStoreRootDir = config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+
+ MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config);
monitor.startAsync();
monitor.getMockKafkaStream().pushToStream("job1:1");
@@ -58,6 +66,18 @@ public class KafkaJobMonitorTest {
Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2");
+ monitor.getMockKafkaStream().pushToStream("/flow3/job3:1");
+ monitor.awaitExactlyNSpecs(3);
+ Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("/flow3/job3")));
+
+ // TODO: Currently, state stores are not categorized by flow name.
+ // This can lead to one job overwriting other jobs' job state.
+ fs.create(new Path(stateStoreRootDir, "job3"));
+ Assert.assertTrue(fs.exists(new Path(stateStoreRootDir, "job3")));
+ monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + ":/flow3/job3");
+ monitor.awaitExactlyNSpecs(2);
+ Assert.assertFalse(fs.exists(new Path(stateStoreRootDir, "job3")));
+
monitor.shutDown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
index 6e8d7a2..e8d4e6c 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime.kafka;
+import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
@@ -24,9 +25,11 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
+import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.job_monitor.MockKafkaStream;
@@ -35,6 +38,10 @@ public class HighLevelConsumerTest {
public static Config getSimpleConfig(Optional<String> prefix) {
Properties properties = new Properties();
properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
+ properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ properties.put(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.toString());
return ConfigFactory.parseProperties(properties);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index db92ef9..855d692 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -211,7 +211,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
*/
protected JobSpec jobSpecGenerator(FlowSpec flowSpec) {
JobSpec jobSpec;
- JobSpec.Builder jobSpecBuilder = JobSpec.builder(flowSpec.getUri())
+ JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowSpec))
.withConfig(flowSpec.getConfig())
.withDescription(flowSpec.getDescription())
.withVersion(flowSpec.getVersion());
@@ -254,6 +254,32 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
}
/**
+ * It can receive multiple number of parameters, needed to generate a unique URI.
+ * Implementation is flowSpecCompiler dependent.
+ * This method should return URI which has job name at third place, when split by "/"
+ * e.g. /flowGroup/flowName
+ * /flowGroup/flowName/sourceNode-targetNode
+ * SafeDatasetCommit creates state store using this name and
+ * {@link org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor} extract job name to find the state store path.
+ * @param objects
+ * @return
+ */
+ public URI jobSpecURIGenerator(Object... objects) {
+ return ((FlowSpec)objects[0]).getUri();
+ }
+
+ /**
+ * It returns the template uri for job.
+ * This method can be overridden by derived FlowToJobSpecCompiler classes.
+ * @param flowSpec
+ * @return template URI
+ */
+ protected URI jobSpecTemplateURIGenerator(FlowSpec flowSpec) {
+ // For now only first template uri will be honored for Identity
+ return flowSpec.getTemplateURIs().get().iterator().next();
+ }
+
+ /**
* Ideally each edge has its own eligible template repository(Based on {@link SpecExecutor})
* to pick templates from.
*
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index ba5c203..544ca42 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -323,14 +323,18 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec, FlowEdge flowEdge) {
URI firstTemplateURI =
(edgeTemplateMap != null && edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get(
- flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getTemplateURI().orNull();
+ flowEdge.getEdgeIdentity()).get(0) : jobSpecTemplateURIGenerator(flowSpec);
return firstTemplateURI;
}
/**
* A naive implementation of generating a jobSpec's URI within a multi-hop logical Flow.
*/
- public static URI jobSpecURIGenerator(FlowSpec flowSpec, ServiceNode sourceNode, ServiceNode targetNode) {
+ @Override
+ public URI jobSpecURIGenerator(Object... objects) {
+ FlowSpec flowSpec = (FlowSpec) objects[0];
+ ServiceNode sourceNode = (ServiceNode) objects[1];
+ ServiceNode targetNode = (ServiceNode) objects[2];
try {
return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, flowSpec.getUri().getAuthority(),
StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(), "/"),"/")