You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/04 20:24:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-720] Always
delete state store
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d3eecb8 [GOBBLIN-720] Always delete state store
d3eecb8 is described below
commit d3eecb89da66119966cf5494167946dc94032f45
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Apr 4 13:24:37 2019 -0700
[GOBBLIN-720] Always delete state store
Closes #2587 from
arjun4084346/alwaysDeleteStateStore
---
.../service/FlowConfigResourceLocalHandler.java | 58 +++++++++++++++++----
.../job_monitor/AvroJobSpecKafkaJobMonitor.java | 59 ++++------------------
.../runtime/job_monitor/KafkaJobMonitor.java | 29 ++++++++++-
...ControllerUserDefinedMessageHandlerFactory.java | 2 +-
4 files changed, 87 insertions(+), 61 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 405e8e3..9b05767 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -61,9 +61,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
try {
- URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
- URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
- "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, null);
+ URI flowUri = FlowUriUtils.createFlowSpecUri(flowId);
FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri);
FlowConfig flowConfig = new FlowConfig();
Properties flowProps = spec.getConfigAsProperties();
@@ -165,7 +163,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
URI flowUri = null;
try {
- flowUri = createFlowSpecUri(flowId);
+ flowUri = FlowUriUtils.createFlowSpecUri(flowId);
this.flowCatalog.remove(flowUri, header, triggerListener);
return new UpdateResponse(HttpStatus.S_200_OK);
} catch (URISyntaxException e) {
@@ -180,13 +178,6 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
return deleteFlowConfig(flowId, header, true);
}
- public static URI createFlowSpecUri (FlowId flowId) throws URISyntaxException {
- URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
- URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
- "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, null);
- return flowUri;
- }
-
/**
* Build a {@link FlowSpec} from a {@link FlowConfig}
* @param flowConfig flow configuration
@@ -236,4 +227,49 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e);
}
}
+
+ public static class FlowUriUtils {
+ private final static String URI_SCHEME = "gobblin-flow";
+ private final static String URI_AUTHORITY = null;
+ private final static String URI_PATH_SEPARATOR = "/";
+ private final static String URI_QUERY = null;
+ private final static String URI_FRAGMENT = null;
+ private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
+
+ public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
+ return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
+ }
+
+ private static String createUriPath(FlowId flowId) {
+ return URI_PATH_SEPARATOR + flowId.getFlowGroup() + URI_PATH_SEPARATOR + flowId.getFlowName();
+ }
+
+ /**
+ * returns the flow name from the flowUri
+ * @param flowUri FlowUri
+ * @return null if the provided flowUri is not valid
+ */
+ public static String getFlowName(URI flowUri) {
+ String[] uriTokens = flowUri.getPath().split("/");
+ if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+ log.error("Invalid URI {}.", flowUri);
+ return null;
+ }
+ return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 1];
+ }
+
+ /**
+ * returns the flow group from the flowUri
+ * @param flowUri FlowUri
+ * @return null if the provided flowUri is not valid
+ */
+ public static String getFlowGroup(URI flowUri) {
+ String[] uriTokens = flowUri.getPath().split("/");
+ if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+ log.error("Invalid URI {}.", flowUri);
+ return null;
+ }
+ return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
+ }
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index e035326..40e4e48 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Properties;
-import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -37,12 +36,12 @@ import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor.Verb;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import kafka.message.MessageAndMetadata;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -57,8 +56,7 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec";
public static final String TOPIC_KEY = "topic";
public static final String SCHEMA_VERSION_READER_CLASS = "versionReaderClass";
- public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
-
+ protected static final String VERB_KEY = "Verb";
private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
SCHEMA_VERSION_READER_CLASS, FixedSchemaVersionWriter.class.getName()));
@@ -106,9 +104,9 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
}
/**
- * Creates {@link JobSpec} from the {@link AvroJobSpec} record.
+ * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} record.
* @param record the record as an {@link AvroJobSpec}
- * @return a {@link JobSpec} wrapped in a {@link Collection} of {@link Either}
+ * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} of {@link Either}
*/
@Override
public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
@@ -127,52 +125,17 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
}
}
+ String verbName = record.getMetadata().get(VERB_KEY);
+ SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
+
JobSpec jobSpec = jobSpecBuilder.build();
log.info("Parsed job spec " + jobSpec.toString());
- return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
- }
-
- @Override
- protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
- try {
- Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
- for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
- JobSpec jobSpec = ((Either.Left<JobSpec, URI>)parsedMessage).getLeft();
- if (jobSpec.getMetadata().get(JobSpec.VERB_KEY).equalsIgnoreCase(SpecExecutor.Verb.DELETE.name())) {
- this.removedSpecs.inc();
- URI jobSpecUri = jobSpec.getUri();
- this.jobCatalog.remove(jobSpecUri);
-
- // Refer FlowConfigsResources:delete to understand the pattern of flow URI
- // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
- if (jobSpec.getConfig().hasPath(DELETE_STATE_STORE_KEY) &&
- Boolean.parseBoolean(jobSpec.getConfig().getString(DELETE_STATE_STORE_KEY))) {
- // Delete the job state if it is a delete spec request
- String[] uriTokens = jobSpecUri.getPath().split("/");
- if (null == this.datasetStateStore) {
- log.warn("Job state store deletion failed as datasetstore is not initialized.");
- continue;
- }
- if (uriTokens.length != 3) {
- log.error("Invalid URI {}.", jobSpecUri);
- continue;
- }
- String jobName = uriTokens[2];
- this.datasetStateStore.delete(jobName);
- log.info("JobSpec {} deleted with statestore.", jobSpecUri);
- } else {
- log.info("JobSpec {} deleted keeping statestore.", jobSpecUri);
- }
- } else {
- this.newSpecs.inc();
- this.jobCatalog.put(jobSpec);
- }
- }
- } catch (IOException ioe) {
- String messageStr = new String(message.message(), Charsets.UTF_8);
- log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+ if (verb == Verb.ADD || verb == Verb.UPDATE) {
+ return Lists.newArrayList(Either.left(jobSpec));
+ } else {
+ return Lists.newArrayList(Either.right(jobSpec.getUri()));
}
}
}
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 9d79fc0..6ac9a68 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -106,7 +106,10 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
} else if (parsedMessage instanceof Either.Right) {
this.removedSpecs.inc();
- this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
+ URI jobSpecUri = ((Either.Right<JobSpec, URI>) parsedMessage).getRight();
+ this.jobCatalog.remove(jobSpecUri);
+ // Delete the job state if it is a delete spec request
+ deleteStateStore(jobSpecUri);
}
}
} catch (IOException ioe) {
@@ -115,4 +118,28 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
}
}
+ /**
+ * It fetches the job name from the given jobSpecUri
+ * and deletes its corresponding state store
+ * @param jobSpecUri jobSpecUri as created by
+ * {@link FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri}
+ * @throws IOException
+ */
+ private void deleteStateStore(URI jobSpecUri) throws IOException {
+ int EXPECTED_NUM_URI_TOKENS = 3;
+ String[] uriTokens = jobSpecUri.getPath().split("/");
+
+ if (null == this.datasetStateStore) {
+ log.warn("Job state store deletion failed as datasetstore is not initialized.");
+ return;
+ }
+ if (uriTokens.length != EXPECTED_NUM_URI_TOKENS) {
+ log.error("Invalid URI {}.", jobSpecUri);
+ return;
+ }
+
+ String jobName = uriTokens[EXPECTED_NUM_URI_TOKENS - 1];
+ this.datasetStateStore.delete(jobName);
+ log.info("JobSpec {} deleted with statestore.", jobSpecUri);
+ }
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
index d02d861..6181ff9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -152,7 +152,7 @@ class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactor
FlowId id = FlowConfigUtils.deserializeFlowId(msg);
if (flowCatalogLocalCommit) {
// in balance mode, flow spec is already deleted in flow catalog on standby node.
- URI flowUri = FlowConfigResourceLocalHandler.createFlowSpecUri(id);
+ URI flowUri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(id);
log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", flowUri);
jobScheduler.onDeleteSpec(flowUri, FlowSpec.Builder.DEFAULT_VERSION);
} else {