You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/12/06 02:02:12 UTC

[GitHub] [gobblin] umustafi commented on a diff in pull request #3612: [GOBBLIN-1752] Fix race condition where FSTemplateCatalog would update at the same t…

umustafi commented on code in PR #3612:
URL: https://github.com/apache/gobblin/pull/3612#discussion_r1040339315


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -88,6 +92,10 @@ public void onFileDelete(final Path path) {
   @Override
   public void onCheckDetectedChange() {
     log.info("Detecting change in flowgraph files, reloading flowgraph");
+    if (this.shouldMonitorTemplateCatalog) {
+      // Clear template cache as templates are colocated with the flowgraph, and thus could have been updated too

Review Comment:
   this will trigger a reloading of the flow templates on the other monitor right?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java:
##########
@@ -227,6 +234,22 @@ public void testSharedFlowgraphHelper() throws Exception {
   }
 
   @Test (dependsOnMethods = "testSharedFlowgraphHelper")

Review Comment:
   how does this test the new logic above? Is it testing the new lock used monitors changes?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFFlowTemplateCatalogTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+
+@Slf4j
+public class UpdatableFSFFlowTemplateCatalogTest {
+
+  private File templateDir;
+  private Config templateCatalogCfg;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    this.templateDir = Files.createTempDir();
+    FileUtils.forceDeleteOnExit(templateDir);
+    FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()), templateDir);
+    Properties properties = new Properties();
+    properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, templateDir.toURI().toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    this.templateCatalogCfg = config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+  }
+
+  @Test
+  public void testModifyFlowTemplate() throws Exception {
+    UpdatableFSFlowTemplateCatalog catalog = new UpdatableFSFlowTemplateCatalog(this.templateCatalogCfg, new ReentrantReadWriteLock());
+
+    // Check cached flow template is returned
+    FlowTemplate flowTemplate1 = catalog.getFlowTemplate(new URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+    FlowTemplate flowTemplate2 = catalog.getFlowTemplate(new URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+    Assert.assertSame(flowTemplate1, flowTemplate2);
+
+    // Update a file flow catalog and check that the getFlowTemplate returns the new value
+    Path flowConfPath = new File(new File(this.templateDir, FSFlowTemplateCatalogTest.TEST_TEMPLATE_NAME), "flow.conf").toPath();
+    List<String> lines = java.nio.file.Files.readAllLines(flowConfPath);
+    for (int i = 0; i < lines.size(); i++) {
+      if (lines.get(i).equals("gobblin.flow.edge.input.dataset.descriptor.0.format=avro")) {
+        lines.set(i, "gobblin.flow.edge.input.dataset.descriptor.0.format=any");
+        break;
+      }
+    }
+    java.nio.file.Files.write(flowConfPath, lines);
+    catalog.clearTemplates();
+    Function testFunction = new GetFlowTemplateConfigFunction(new URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI), catalog,
+        "gobblin.flow.edge.input.dataset.descriptor.0.format");
+    AssertWithBackoff.create().timeoutMs(10000).assertEquals(testFunction, "any", "flow template updated");
+  }
+
+  @AllArgsConstructor
+  private class GetFlowTemplateConfigFunction implements Function<Void, String> {
+    private URI flowTemplateCatalogUri;
+    private FSFlowTemplateCatalog flowTemplateCatalog;
+    private String configKey;
+
+    @Override
+    public String apply(Void input) {
+      try {
+        return this.flowTemplateCatalog.getFlowTemplate(this.flowTemplateCatalogUri).getRawTemplateConfig().getString(this.configKey);
+      } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) {
+        throw new RuntimeException(e);
+      }

Review Comment:
   where is this used?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFlowTemplateCatalog.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+/**
+ * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates. It provides a public method clearTemplates()
+ * for other classes to invoke, so that other classes can reload the job templates before they make a change. E.g. The
+ * {@link org.apache.gobblin.service.monitoring.FsFlowGraphMonitor} has a configuration to clear the template cache before updating the flowgraph.
+ */
+public class UpdatableFSFlowTemplateCatalog extends FSFlowTemplateCatalog {
+  private final Map<URI, FlowTemplate> flowTemplateMap = new ConcurrentHashMap<>();
+  private final Map<URI, List<JobTemplate>> jobTemplateMap = new ConcurrentHashMap<>();
+  private final ReadWriteLock rwLock;
+
+  public UpdatableFSFlowTemplateCatalog(Config sysConfig, ReadWriteLock rwLock) throws IOException {
+    super(sysConfig);
+    this.rwLock = rwLock;
+  }
+
+  @Override
+  public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
+      throws SpecNotFoundException, JobTemplate.TemplateException, IOException, URISyntaxException {
+    FlowTemplate flowTemplate = flowTemplateMap.getOrDefault(flowTemplateDirURI, null);
+
+    if (flowTemplate == null) {
+      flowTemplate = super.getFlowTemplate(flowTemplateDirURI);
+      flowTemplateMap.put(flowTemplateDirURI, flowTemplate);
+    }
+
+    return flowTemplate;
+  }
+
+  @Override
+  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
+    List<JobTemplate> jobTemplates = jobTemplateMap.getOrDefault(flowTemplateDirURI, null);
+
+    if (jobTemplates == null) {
+      jobTemplates = super.getJobTemplatesForFlow(flowTemplateDirURI);
+      jobTemplateMap.put(flowTemplateDirURI, jobTemplates);
+    }
+
+    return jobTemplates;
+  }
+
+  /**
+   * Clear cached templates so they will be reloaded next time {@link #getFlowTemplate(URI)} is called.
+   */
+  public void clearTemplates() {
+    this.rwLock.writeLock().lock();
+    log.info("Change detected, reloading flow templates.");

Review Comment:
   should we be more specific, we're actually clearing them here and when the reload happens let's log that



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -88,6 +92,10 @@ public void onFileDelete(final Path path) {
   @Override
   public void onCheckDetectedChange() {
     log.info("Detecting change in flowgraph files, reloading flowgraph");
+    if (this.shouldMonitorTemplateCatalog) {
+      // Clear template cache as templates are colocated with the flowgraph, and thus could have been updated too

Review Comment:
   actually I see this below, nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org