You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/09 18:55:22 UTC
incubator-gobblin git commit: [GOBBLIN-456] add option to delete
state store
Repository: incubator-gobblin
Updated Branches:
refs/heads/master ed91dcdae -> ccd7ba769
[GOBBLIN-456] add option to delete state store
add option to delete state store
Closes #2327 from
arjun4084346/addDeleteStateStoreOption
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ccd7ba76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ccd7ba76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ccd7ba76
Branch: refs/heads/master
Commit: ccd7ba769308e720db33ea800d964df43df4e878
Parents: ed91dcd
Author: Arjun <ab...@linkedin.com>
Authored: Mon Apr 9 11:54:31 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Apr 9 11:54:58 2018 -0700
----------------------------------------------------------------------
.../gobblin/runtime/api/SpecExecutor.java | 3 +-
.../gobblin/runtime/api/SpecProducer.java | 7 ++-
.../orchestration/AzkabanSpecProducer.java | 3 +-
.../orchestration/AzkabanProjectConfigTest.java | 11 ++--
.../service/SimpleKafkaSpecProducer.java | 6 +-
.../service/StreamingKafkaSpecConsumer.java | 4 +-
.../gobblin/service/FlowConfigClient.java | 18 ++++++
.../gobblin/service/FlowConfigsResource.java | 19 +++++-
.../org/apache/gobblin/runtime/api/JobSpec.java | 31 +++++++++-
.../gobblin/runtime/api/MutableSpecCatalog.java | 7 +--
.../apache/gobblin/runtime/api/SpecCatalog.java | 6 +-
.../runtime/api/SpecCatalogListener.java | 9 ++-
.../job_monitor/AvroJobSpecKafkaJobMonitor.java | 62 ++++++++++++++++----
.../runtime/job_monitor/KafkaJobMonitor.java | 24 ++------
.../runtime/job_spec/ResolvedJobSpec.java | 2 +-
.../runtime/spec_catalog/FlowCatalog.java | 10 +++-
.../spec_catalog/SpecCatalogListenersList.java | 5 +-
.../runtime/spec_catalog/TopologyCatalog.java | 9 ++-
.../InMemorySpecProducer.java | 3 +-
.../job_monitor/KafkaJobMonitorTest.java | 18 ------
.../job_monitor/MockedKafkaJobMonitor.java | 1 +
.../modules/flow/BaseFlowToJobSpecCompiler.java | 8 ++-
.../modules/orchestration/Orchestrator.java | 13 ++--
.../scheduler/GobblinServiceJobScheduler.java | 8 ++-
24 files changed, 198 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index cb5197a..85ee7af 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -58,7 +58,8 @@ public interface SpecExecutor {
public static enum Verb {
ADD(1, "add"),
UPDATE(2, "update"),
- DELETE(3, "delete");
+ DELETE(3, "delete"),
+ UNKNOWN(4, "unknown");
private int _id;
private String _verb;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 9b9e504..880847d 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.gobblin.annotation.Alpha;
@@ -38,8 +39,12 @@ public interface SpecProducer<V> {
/** Update a {@link Spec} being executed on {@link SpecExecutor}. */
Future<?> updateSpec(V updatedSpec);
+ default Future<?> deleteSpec(URI deletedSpecURI) {
+ return deleteSpec(deletedSpecURI, new Properties());
+ }
+
/** Delete a {@link Spec} being executed on {@link SpecExecutor}. */
- Future<?> deleteSpec(URI deletedSpecURI);
+ Future<?> deleteSpec(URI deletedSpecURI, Properties headers);
/** List all {@link Spec} being executed on {@link SpecExecutor}. */
Future<? extends List<V>> listSpecs();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
index 7b11cef..a1ae133 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.commons.codec.EncoderException;
@@ -139,7 +140,7 @@ public class AzkabanSpecProducer implements SpecProducer<Spec>, Closeable {
}
@Override
- public Future<?> deleteSpec(URI deletedSpecURI) {
+ public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
// Delete project
JobSpec jobSpec = new JobSpec.Builder(deletedSpecURI).build();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
index 9e189ab..3a48806 100644
--- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
+++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
import java.net.URI;
+import java.util.Collections;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
@@ -39,7 +40,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
JobSpec jobSpec = new JobSpec(new URI("uri"), "0.0", "test job spec",
- ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -54,7 +55,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
- ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -69,7 +70,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
- "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -84,7 +85,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec",
- ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
@@ -99,7 +100,7 @@ public class AzkabanProjectConfigTest {
Properties properties = new Properties();
properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName");
JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"),
- "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent());
+ "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP);
AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec);
String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index a5163db..c56593c 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.Future;
+import java.util.Properties;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
@@ -101,10 +102,11 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
}
@Override
- public Future<?> deleteSpec(URI deletedSpecURI) {
+ public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
- .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name())).build();
+ .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name()))
+ .setProperties(Maps.fromProperties(headers)).build();
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 6d8de39..ef44c7d 100644
--- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -224,8 +224,8 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S
}
private long getRemovedSpecs() {
- return StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs() != null?
- StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount() : 0;
+ return StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs() != null?
+ StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs().getCount() : 0;
}
private long getMessageParseFailures() {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
index 28255bb..a1c983e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
@@ -52,6 +52,7 @@ public class FlowConfigClient implements Closeable {
private Optional<HttpClientFactory> _httpClientFactory;
private Optional<RestClient> _restClient;
private final FlowconfigsRequestBuilders _flowconfigsRequestBuilders;
+ public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
/**
* Construct a {@link FlowConfigClient} to communicate with http flow config server at URI serverUri
@@ -156,6 +157,23 @@ public class FlowConfigClient implements Closeable {
response.getResponse();
}
+ /**
+ * Delete a flow configuration
+ * @param flowId identifier of flow configuration to delete
+ * @throws RemoteInvocationException
+ */
+ public void deleteFlowConfigWithStateStore(FlowId flowId)
+ throws RemoteInvocationException {
+ LOG.debug("deleteFlowConfig and state store with groupName " + flowId.getFlowGroup() + " flowName " +
+ flowId.getFlowName());
+
+ DeleteRequest<FlowConfig> deleteRequest = _flowconfigsRequestBuilders.delete()
+ .id(new ComplexResourceKey<>(flowId, new EmptyRecord())).setHeader(DELETE_STATE_STORE_KEY, Boolean.TRUE.toString()).build();
+ ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest);
+
+ response.getResponse();
+ }
+
@Override
public void close()
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index f0bce17..9074a43 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.service;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -46,12 +48,17 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import com.google.common.collect.ImmutableSet;
+
+
/**
* Resource for handling flow configuration requests
*/
@RestLiCollection(name = "flowconfigs", namespace = "org.apache.gobblin.service", keyName = "id")
public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, EmptyRecord, FlowConfig> {
private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsResource.class);
+ private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
+
@edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL")
public static FlowCatalog _globalFlowCatalog;
@@ -234,7 +241,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
"/" + flowGroup + "/" + flowName, null, null);
- getFlowCatalog().remove(flowUri);
+ getFlowCatalog().remove(flowUri, getHeaders());
return new UpdateResponse(HttpStatus.S_200_OK);
} catch (URISyntaxException e) {
@@ -244,6 +251,16 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
return null;
}
+ private Properties getHeaders() {
+ Properties headerProperties = new Properties();
+ for (Map.Entry<String, String> entry : getContext().getRequestHeaders().entrySet()) {
+ if (ALLOWED_METADATA.contains(entry.getKey())) {
+ headerProperties.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return headerProperties;
+ }
+
/***
* This method is to workaround injection issues where Service has only one active global FlowCatalog
* .. and is not able to inject it via RestLI bootstrap. We should remove this and make injected
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 0ae943e..203ea8d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -19,11 +19,13 @@ package org.apache.gobblin.runtime.api;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Map;
import java.util.Properties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -34,8 +36,10 @@ import org.apache.gobblin.util.ConfigUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
/**
* Defines a Gobblin Job that can be run once, or multiple times. A {@link JobSpec} is
* {@link Configurable} so it has an associated {@link Config}, along with other mandatory
@@ -64,6 +68,12 @@ public class JobSpec implements Configurable, Spec {
/** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
Optional<URI> templateURI;
+ /** Metadata can contain properties which are not a part of config, e.g. Verb */
+ Map<String, String> metadata;
+
+ /** A Verb identifies if the Spec is for Insert/Update/Delete */
+ public static final String VERB_KEY = "Verb";
+
public static Builder builder(URI jobSpecUri) {
return new Builder(jobSpecUri);
}
@@ -131,6 +141,7 @@ public class JobSpec implements Configurable, Spec {
private Optional<String> description = Optional.absent();
private Optional<URI> jobCatalogURI = Optional.absent();
private Optional<URI> templateURI = Optional.absent();
+ private Optional<Map> metadata = Optional.absent();
public Builder(URI jobSpecUri) {
Preconditions.checkNotNull(jobSpecUri);
@@ -156,7 +167,7 @@ public class JobSpec implements Configurable, Spec {
Preconditions.checkNotNull(this.uri);
Preconditions.checkNotNull(this.version);
return new JobSpec(getURI(), getVersion(), getDescription(), getConfig(),
- getConfigAsProperties(), getTemplateURI());
+ getConfigAsProperties(), getTemplateURI(), getMetadata());
}
/** The scheme and authority of the job catalog URI are used to generate JobSpec URIs from
@@ -289,6 +300,24 @@ public class JobSpec implements Configurable, Spec {
this.templateURI = Optional.of(templateURI);
return this;
}
+
+ public Map getDefaultMetadata() {
+ log.warn("Job Spec Verb is not provided, using type 'UNKNOWN'.");
+ return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
+ }
+
+ public Map getMetadata() {
+ if (!this.metadata.isPresent()) {
+ this.metadata = Optional.of(getDefaultMetadata());
+ }
+ return this.metadata.get();
+ }
+
+ public Builder withMetadata(Map<String, String> metadata) {
+ Preconditions.checkNotNull(metadata);
+ this.metadata = Optional.of(metadata);
+ return this;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 108a324..7a3e946 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -18,15 +18,12 @@
package org.apache.gobblin.runtime.api;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.ContextAwareTimer;
-import org.apache.gobblin.util.ConfigUtils;
import com.google.common.base.Optional;
import com.typesafe.config.Config;
@@ -51,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
* Removes an existing {@link Spec} with the given URI.
* Throws SpecNotFoundException if such {@link Spec} does not exist.
*/
- void remove(URI uri) throws SpecNotFoundException;
+ void remove(URI uri, Properties headers) throws SpecNotFoundException;
@Slf4j
public static class MutableStandardMetrics extends StandardMetrics {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 457be9a..024c20c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,13 +19,12 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
-import java.util.List;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -33,7 +32,6 @@ import org.apache.gobblin.instrumented.GobblinMetricsKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.instrumented.StandardMetricsBridge;
import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareMetric;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
@@ -141,7 +139,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr
}
@Override
- public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
this.totalDeletedSpecs.incrementAndGet();
submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
index 2b0aa40..1448231 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.runtime.api;
import java.net.URI;
+import java.util.Properties;
import com.google.common.base.Objects;
@@ -31,7 +32,7 @@ public interface SpecCatalogListener {
/**
* Invoked when a {@link Spec} gets removed from the catalog.
*/
- public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion);
+ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers);
/**
* Invoked when the contents of a {@link Spec} gets updated in the catalog.
@@ -56,18 +57,20 @@ public interface SpecCatalogListener {
public static class DeleteSpecCallback extends Callback<SpecCatalogListener, Void> {
private final URI _deletedSpecURI;
private final String _deletedSpecVersion;
+ private final Properties _headers;
- public DeleteSpecCallback(URI deletedSpecURI, String deletedSpecVersion) {
+ public DeleteSpecCallback(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
super(Objects.toStringHelper("onDeleteSpec")
.add("deletedSpecURI", deletedSpecURI)
.add("deletedSpecVersion", deletedSpecVersion)
.toString());
_deletedSpecURI = deletedSpecURI;
_deletedSpecVersion = deletedSpecVersion;
+ _headers = headers;
}
@Override public Void apply(SpecCatalogListener listener) {
- listener.onDeleteSpec(_deletedSpecURI, _deletedSpecVersion);
+ listener.onDeleteSpec(_deletedSpecURI, _deletedSpecVersion, _headers);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index 59733d3..e035326 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Properties;
+import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -36,10 +37,12 @@ import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.SpecExecutor.Verb;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import kafka.message.MessageAndMetadata;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -54,7 +57,7 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec";
public static final String TOPIC_KEY = "topic";
public static final String SCHEMA_VERSION_READER_CLASS = "versionReaderClass";
- protected static final String VERB_KEY = "Verb";
+ public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
SCHEMA_VERSION_READER_CLASS, FixedSchemaVersionWriter.class.getName()));
@@ -103,9 +106,9 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
}
/**
- * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} record.
+ * Creates {@link JobSpec} from the {@link AvroJobSpec} record.
* @param record the record as an {@link AvroJobSpec}
- * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} of {@link Either}
+ * @return a {@link JobSpec} wrapped in a {@link Collection} of {@link Either}
*/
@Override
public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
@@ -114,7 +117,7 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
Properties props = new Properties();
props.putAll(record.getProperties());
jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
- .withDescription(record.getDescription()).withConfigAsProperties(props);
+ .withDescription(record.getDescription()).withConfigAsProperties(props).withMetadata(record.getMetadata());
if (!record.getTemplateUri().isEmpty()) {
try {
@@ -124,17 +127,52 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
}
}
- String verbName = record.getMetadata().get(VERB_KEY);
- Verb verb = Verb.valueOf(verbName);
-
JobSpec jobSpec = jobSpecBuilder.build();
log.info("Parsed job spec " + jobSpec.toString());
- if (verb == Verb.ADD || verb == Verb.UPDATE) {
- return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
- } else {
- return Lists.newArrayList(Either.<JobSpec, URI>right(jobSpec.getUri()));
+ return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
+ }
+
+ @Override
+ protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
+ try {
+ Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
+ for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
+ JobSpec jobSpec = ((Either.Left<JobSpec, URI>)parsedMessage).getLeft();
+ if (jobSpec.getMetadata().get(JobSpec.VERB_KEY).equalsIgnoreCase(SpecExecutor.Verb.DELETE.name())) {
+ this.removedSpecs.inc();
+ URI jobSpecUri = jobSpec.getUri();
+ this.jobCatalog.remove(jobSpecUri);
+
+ // Refer FlowConfigsResources:delete to understand the pattern of flow URI
+ // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
+ if (jobSpec.getConfig().hasPath(DELETE_STATE_STORE_KEY) &&
+ Boolean.parseBoolean(jobSpec.getConfig().getString(DELETE_STATE_STORE_KEY))) {
+ // Delete the job state if it is a delete spec request
+ String[] uriTokens = jobSpecUri.getPath().split("/");
+ if (null == this.datasetStateStore) {
+ log.warn("Job state store deletion failed as datasetstore is not initialized.");
+ continue;
+ }
+ if (uriTokens.length != 3) {
+ log.error("Invalid URI {}.", jobSpecUri);
+ continue;
+ }
+ String jobName = uriTokens[2];
+ this.datasetStateStore.delete(jobName);
+ log.info("JobSpec {} deleted with statestore.", jobSpecUri);
+ } else {
+ log.info("JobSpec {} deleted keeping statestore.", jobSpecUri);
+ }
+ } else {
+ this.newSpecs.inc();
+ this.jobCatalog.put(jobSpec);
+ }
+ }
+ } catch (IOException ioe) {
+ String messageStr = new String(message.message(), Charsets.UTF_8);
+ log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 6902eae..9d79fc0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.typesafe.config.Config;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
@@ -52,13 +51,13 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
public static final String KAFKA_AUTO_OFFSET_RESET_KEY = KAFKA_JOB_MONITOR_PREFIX + ".auto.offset.reset";
public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
- private DatasetStateStore datasetStateStore;
- private final MutableJobCatalog jobCatalog;
+ protected DatasetStateStore datasetStateStore;
+ protected final MutableJobCatalog jobCatalog;
@Getter
- private Counter newSpecs;
+ protected Counter newSpecs;
@Getter
- private Counter remmovedSpecs;
+ protected Counter removedSpecs;
/**
* @return A collection of either {@link JobSpec}s to add/update or {@link URI}s to remove from the catalog,
@@ -81,7 +80,7 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
protected void createMetrics() {
super.createMetrics();
this.newSpecs = this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS);
- this.remmovedSpecs = this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_REMOVED_SPECS);
+ this.removedSpecs = this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_REMOVED_SPECS);
}
@VisibleForTesting
@@ -106,19 +105,8 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
this.newSpecs.inc();
this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
} else if (parsedMessage instanceof Either.Right) {
- this.remmovedSpecs.inc();
+ this.removedSpecs.inc();
this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
-
- // Refer FlowConfigsResources:delete to understand the pattern of flow URI
- // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
- String[] uriTokens = ((URI)(((Either.Right) parsedMessage).getRight())).getPath().split("/");
- if (uriTokens.length == 3) {
- String jobName = uriTokens[2];
- // Delete the job state if it is a delete spec request
- if (this.datasetStateStore != null) {
- this.datasetStateStore.delete(jobName);
- }
- }
}
}
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
index 8847467..b64f178 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
@@ -59,7 +59,7 @@ public class ResolvedJobSpec extends JobSpec {
public ResolvedJobSpec(JobSpec other, JobCatalog catalog)
throws SpecNotFoundException, JobTemplate.TemplateException {
super(other.getUri(), other.getVersion(), other.getDescription(), resolveConfig(other, catalog),
- ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI());
+ ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI(), other.getMetadata());
this.originalJobSpec = other;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index f78be47..f9ae420 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -23,6 +23,8 @@ import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
+
import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
@@ -255,8 +257,12 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
}
- @Override
public void remove(URI uri) {
+ remove(uri, new Properties());
+ }
+
+ @Override
+ public void remove(URI uri, Properties headers) {
try {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(uri);
@@ -264,7 +270,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
log.info(String.format("Removing FlowSpec with URI: %s", uri));
specStore.deleteSpec(uri);
this.metrics.updateRemoveSpecTime(startTime);
- this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
+ this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, headers);
} catch (IOException e) {
throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index cdb9379..f2cd04b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -56,6 +56,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.Properties;
import org.slf4j.Logger;
@@ -109,11 +110,11 @@ public class SpecCatalogListenersList implements SpecCatalogListener, SpecCatalo
}
@Override
- public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
Preconditions.checkNotNull(deletedSpecURI);
try {
- _disp.execCallbacks(new SpecCatalogListener.DeleteSpecCallback(deletedSpecURI, deletedSpecVersion));
+ _disp.execCallbacks(new SpecCatalogListener.DeleteSpecCallback(deletedSpecURI, deletedSpecVersion, headers));
} catch (InterruptedException e) {
getLog().warn("onDeleteSpec interrupted.");
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 5c25a67..a842abd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
@@ -241,14 +242,18 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
}
}
- @Override
public void remove(URI uri) {
+ remove(uri, new Properties());
+ }
+
+ @Override
+ public void remove(URI uri, Properties headers) {
try {
Preconditions.checkState(state() == Service.State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(uri);
log.info(String.format("Removing TopologySpec with URI: %s", uri));
- this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
+ this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, headers);
specStore.deleteSpec(uri);
} catch (IOException e) {
throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
index 80f64ec..cc74757 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.net.URI;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
@@ -65,7 +66,7 @@ public class InMemorySpecProducer implements SpecProducer<Spec>, Serializable {
}
@Override
- public Future<?> deleteSpec(URI deletedSpecURI) {
+ public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
if (!provisionedSpecs.containsKey(deletedSpecURI)) {
throw new RuntimeException("Spec not found: " + deletedSpecURI);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
index 57f99a9..c825a12 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
@@ -19,16 +19,12 @@ package org.apache.gobblin.runtime.job_monitor;
import java.net.URI;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.typesafe.config.Config;
-import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
@@ -38,8 +34,6 @@ public class KafkaJobMonitorTest {
public void test() throws Exception {
Config config = HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX));
- String stateStoreRootDir = config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY);
- FileSystem fs = FileSystem.getLocal(new Configuration());
MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config);
monitor.startAsync();
@@ -66,18 +60,6 @@ public class KafkaJobMonitorTest {
Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2");
- monitor.getMockKafkaStream().pushToStream("/flow3/job3:1");
- monitor.awaitExactlyNSpecs(3);
- Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("/flow3/job3")));
-
- // TODO: Currently, state stores are not categorized by flow name.
- // This can lead to one job overwriting other jobs' job state.
- fs.create(new Path(stateStoreRootDir, "job3"));
- Assert.assertTrue(fs.exists(new Path(stateStoreRootDir, "job3")));
- monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + ":/flow3/job3");
- monitor.awaitExactlyNSpecs(2);
- Assert.assertFalse(fs.exists(new Path(stateStoreRootDir, "job3")));
-
monitor.shutDown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index 7d3ef37..9e55236 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -98,6 +98,7 @@ class MockedKafkaJobMonitor extends KafkaJobMonitor {
return jobCatalog;
}
+
@Override
public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message)
throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 855d692..e4c9ae3 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -22,6 +22,8 @@ import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+
import javax.annotation.Nonnull;
import com.codahale.metrics.Meter;
@@ -158,8 +160,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
}
+ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
+ }
+
@Override
- public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
if (topologySpecMap.containsKey(deletedSpecURI)) {
topologySpecMap.remove(deletedSpecURI);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index e2d36aa..1b3907d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -22,6 +22,7 @@ import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
@@ -144,13 +145,17 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
}
}
+ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
+ }
+
/** {@inheritDoc} */
@Override
- public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
_log.info("Spec deletion detected: " + deletedSpecURI + "/" + deletedSpecVersion);
if (topologyCatalog.isPresent()) {
- this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion);
+ this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, headers);
}
}
@@ -211,7 +216,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
- public void remove(Spec spec) {
+ public void remove(Spec spec, Properties headers) {
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all executions
@@ -232,7 +237,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
Spec jobSpec = specsToDelete.getKey();
_log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer));
- producer.deleteSpec(jobSpec.getUri());
+ producer.deleteSpec(jobSpec.getUri(), headers);
} catch(Exception e) {
_log.error("Cannot successfully delete spec: " + specsToDelete.getKey() + " on executor: " + producer +
" for flow: " + spec, e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ae18fc2..328c742 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -237,9 +237,13 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
}
+ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
+ }
+
/** {@inheritDoc} */
@Override
- public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+ public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
// Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore
// .. Specs if in cluster mode and Helix is not yet initialized
@@ -259,7 +263,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
try {
Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString());
if (null != deletedSpec) {
- this.orchestrator.remove(deletedSpec);
+ this.orchestrator.remove(deletedSpec, headers);
this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
unscheduleJob(deletedSpecURI.toString());
} else {