You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/04 20:24:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-720] Always delete state store

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d3eecb8  [GOBBLIN-720] Always delete state store
d3eecb8 is described below

commit d3eecb89da66119966cf5494167946dc94032f45
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Apr 4 13:24:37 2019 -0700

    [GOBBLIN-720] Always delete state store
    
    Closes #2587 from
    arjun4084346/alwaysDeleteStateStore
---
 .../service/FlowConfigResourceLocalHandler.java    | 58 +++++++++++++++++----
 .../job_monitor/AvroJobSpecKafkaJobMonitor.java    | 59 ++++------------------
 .../runtime/job_monitor/KafkaJobMonitor.java       | 29 ++++++++++-
 ...ControllerUserDefinedMessageHandlerFactory.java |  2 +-
 4 files changed, 87 insertions(+), 61 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 405e8e3..9b05767 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -61,9 +61,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
 
     try {
-      URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
-      URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
-          "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, null);
+      URI flowUri = FlowUriUtils.createFlowSpecUri(flowId);
       FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri);
       FlowConfig flowConfig = new FlowConfig();
       Properties flowProps = spec.getConfigAsProperties();
@@ -165,7 +163,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     URI flowUri = null;
 
     try {
-      flowUri = createFlowSpecUri(flowId);
+      flowUri = FlowUriUtils.createFlowSpecUri(flowId);
       this.flowCatalog.remove(flowUri, header, triggerListener);
       return new UpdateResponse(HttpStatus.S_200_OK);
     } catch (URISyntaxException e) {
@@ -180,13 +178,6 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     return deleteFlowConfig(flowId, header, true);
   }
 
-  public static URI createFlowSpecUri (FlowId flowId) throws URISyntaxException {
-    URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
-    URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
-        "/" + flowId.getFlowGroup() + "/" + flowId.getFlowName(), null, null);
-    return flowUri;
-  }
-
   /**
    * Build a {@link FlowSpec} from a {@link FlowConfig}
    * @param flowConfig flow configuration
@@ -236,4 +227,49 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getTemplateUris(), e);
     }
   }
+
+  public static class FlowUriUtils {
+    private final static String URI_SCHEME = "gobblin-flow";
+    private final static String URI_AUTHORITY = null;
+    private final static String URI_PATH_SEPARATOR = "/";
+    private final static String URI_QUERY = null;
+    private final static String URI_FRAGMENT = null;
+    private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
+
+    public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
+      return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
+    }
+
+    private static String createUriPath(FlowId flowId) {
+      return URI_PATH_SEPARATOR + flowId.getFlowGroup() + URI_PATH_SEPARATOR + flowId.getFlowName();
+    }
+
+    /**
+     * returns the flow name from the flowUri
+     * @param flowUri FlowUri
+     * @return null if the provided flowUri is not valid
+     */
+    public static String getFlowName(URI flowUri) {
+      String[] uriTokens = flowUri.getPath().split("/");
+      if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+        log.error("Invalid URI {}.", flowUri);
+        return null;
+      }
+      return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 1];
+    }
+
+    /**
+     * returns the flow group from the flowUri
+     * @param flowUri FlowUri
+     * @return null if the provided flowUri is not valid
+     */
+    public static String getFlowGroup(URI flowUri) {
+      String[] uriTokens = flowUri.getPath().split("/");
+      if (uriTokens.length != EXPECTED_NUM_URI_PATH_TOKENS) {
+        log.error("Invalid URI {}.", flowUri);
+        return null;
+      }
+      return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
+    }
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index e035326..40e4e48 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Properties;
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -37,12 +36,12 @@ import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
 import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor.Verb;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -57,8 +56,7 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
   public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec";
   public static final String TOPIC_KEY = "topic";
   public static final String SCHEMA_VERSION_READER_CLASS = "versionReaderClass";
-  public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
-
+  protected static final String VERB_KEY = "Verb";
   private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of(
       SCHEMA_VERSION_READER_CLASS, FixedSchemaVersionWriter.class.getName()));
 
@@ -106,9 +104,9 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
   }
 
   /**
-   * Creates {@link JobSpec} from the {@link AvroJobSpec} record.
+   * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} record.
    * @param record the record as an {@link AvroJobSpec}
-   * @return a {@link JobSpec} wrapped in a {@link Collection} of {@link Either}
+   * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} of {@link Either}
    */
   @Override
   public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
@@ -127,52 +125,17 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec>
       }
     }
 
+    String verbName = record.getMetadata().get(VERB_KEY);
+    SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
+
     JobSpec jobSpec = jobSpecBuilder.build();
 
     log.info("Parsed job spec " + jobSpec.toString());
 
-    return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
-  }
-
-  @Override
-  protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
-    try {
-      Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message());
-      for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
-        JobSpec jobSpec = ((Either.Left<JobSpec, URI>)parsedMessage).getLeft();
-        if (jobSpec.getMetadata().get(JobSpec.VERB_KEY).equalsIgnoreCase(SpecExecutor.Verb.DELETE.name())) {
-          this.removedSpecs.inc();
-          URI jobSpecUri = jobSpec.getUri();
-          this.jobCatalog.remove(jobSpecUri);
-
-          // Refer FlowConfigsResources:delete to understand the pattern of flow URI
-          // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
-          if (jobSpec.getConfig().hasPath(DELETE_STATE_STORE_KEY) &&
-              Boolean.parseBoolean(jobSpec.getConfig().getString(DELETE_STATE_STORE_KEY))) {
-            // Delete the job state if it is a delete spec request
-            String[] uriTokens = jobSpecUri.getPath().split("/");
-            if (null == this.datasetStateStore) {
-              log.warn("Job state store deletion failed as datasetstore is not initialized.");
-              continue;
-            }
-            if (uriTokens.length != 3) {
-              log.error("Invalid URI {}.", jobSpecUri);
-              continue;
-            }
-            String jobName = uriTokens[2];
-            this.datasetStateStore.delete(jobName);
-            log.info("JobSpec {} deleted with statestore.", jobSpecUri);
-          } else {
-            log.info("JobSpec {} deleted keeping statestore.", jobSpecUri);
-          }
-        } else {
-          this.newSpecs.inc();
-          this.jobCatalog.put(jobSpec);
-        }
-      }
-    } catch (IOException ioe) {
-      String messageStr = new String(message.message(), Charsets.UTF_8);
-      log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
+    if (verb == Verb.ADD || verb == Verb.UPDATE) {
+      return Lists.newArrayList(Either.left(jobSpec));
+    } else {
+      return Lists.newArrayList(Either.right(jobSpec.getUri()));
     }
   }
 }
\ No newline at end of file
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 9d79fc0..6ac9a68 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -106,7 +106,10 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
           this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
         } else if (parsedMessage instanceof Either.Right) {
           this.removedSpecs.inc();
-          this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight());
+          URI jobSpecUri = ((Either.Right<JobSpec, URI>) parsedMessage).getRight();
+          this.jobCatalog.remove(jobSpecUri);
+          // Delete the job state if it is a delete spec request
+          deleteStateStore(jobSpecUri);
         }
       }
     } catch (IOException ioe) {
@@ -115,4 +118,28 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
     }
   }
 
+  /**
+   * It fetches the job name from the given jobSpecUri
+   * and deletes its corresponding state store
+   * @param jobSpecUri jobSpecUri as created by
+   *                   {@link FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri}
+   * @throws IOException
+   */
+  private void deleteStateStore(URI jobSpecUri) throws IOException {
+    int EXPECTED_NUM_URI_TOKENS = 3;
+    String[] uriTokens = jobSpecUri.getPath().split("/");
+
+    if (null == this.datasetStateStore) {
+      log.warn("Job state store deletion failed as datasetstore is not initialized.");
+      return;
+    }
+    if (uriTokens.length != EXPECTED_NUM_URI_TOKENS) {
+      log.error("Invalid URI {}.", jobSpecUri);
+      return;
+    }
+
+    String jobName = uriTokens[EXPECTED_NUM_URI_TOKENS - 1];
+    this.datasetStateStore.delete(jobName);
+    log.info("JobSpec {} deleted with statestore.", jobSpecUri);
+  }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
index d02d861..6181ff9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -152,7 +152,7 @@ class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactor
         FlowId id = FlowConfigUtils.deserializeFlowId(msg);
         if (flowCatalogLocalCommit) {
           // in balance mode, flow spec is already deleted in flow catalog on standby node.
-          URI flowUri = FlowConfigResourceLocalHandler.createFlowSpecUri(id);
+          URI flowUri = FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri(id);
           log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", flowUri);
           jobScheduler.onDeleteSpec(flowUri, FlowSpec.Builder.DEFAULT_VERSION);
         } else {