You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/06/11 21:23:32 UTC
[1/2] incubator-gobblin git commit: [GOBBLIN-505] Implement a
Git-based FlowGraph Monitor.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 2c5e25d98 -> 6b1201852
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
new file mode 100644
index 0000000..43fa9a3
--- /dev/null
+++ b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.dircache.DirCache;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class GitFlowGraphMonitorTest {
+ private static final Logger logger = LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+ private Repository remoteRepo;
+ private Git gitForPush;
+ private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
+ private final File remoteDir = new File(TEST_DIR + "/remote");
+ private final File cloneDir = new File(TEST_DIR + "/clone");
+ private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
+ private static final String NODE_1_FILE = "node1.properties";
+ private final File node1Dir = new File(flowGraphDir, "node1");
+ private final File node1File = new File(node1Dir, NODE_1_FILE);
+ private static final String NODE_2_FILE = "node2.properties";
+ private final File node2Dir = new File(flowGraphDir, "node2");
+ private final File node2File = new File(node2Dir, NODE_2_FILE);
+ private final File edge1Dir = new File(node1Dir, "node2");
+ private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+ private RefSpec masterRefSpec = new RefSpec("master");
+ private FSFlowCatalog flowCatalog;
+ private Config config;
+ private BaseFlowGraph flowGraph;
+ private GitFlowGraphMonitor gitFlowGraphMonitor;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ cleanUpDir(TEST_DIR);
+
+ // Create a bare repository
+ RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED);
+ this.remoteRepo = fileKey.open(false);
+ this.remoteRepo.create(true);
+
+ this.gitForPush = Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+
+ // push an empty commit as a base for detecting changes
+ this.gitForPush.commit().setMessage("First commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.config = ConfigBuilder.create()
+ .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
+ + ConfigurationKeys.GIT_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
+ .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
+ .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
+ .build();
+
+ // Create a FSFlowCatalog instance
+ URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ Properties properties = new Properties();
+ properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+ //Create a FlowGraph instance with defaults
+ this.flowGraph = new BaseFlowGraph();
+
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph);
+ this.gitFlowGraphMonitor.setActive(true);
+ }
+
+ private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue)
+ throws IOException, GitAPIException {
+ // push a new node file
+ nodeDir.mkdirs();
+ nodeFile.createNewFile();
+ Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
+
+ // add, commit, push node
+ this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call();
+ this.gitForPush.commit().setMessage("Node commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if node1 has been added to the FlowGraph
+ DataNode dataNode = this.flowGraph.getNode(nodeId);
+ Assert.assertEquals(dataNode.getId(), nodeId);
+ Assert.assertTrue(dataNode.isActive());
+ Assert.assertEquals(dataNode.getProps().getString("param1"), paramValue);
+ }
+
+ @Test
+ public void testAddNode()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
+ testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
+ }
+
+ @Test (dependsOnMethods = "testAddNode")
+ public void testAddEdge()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ // push a new node file
+ this.edge1Dir.mkdirs();
+ this.edge1File.createNewFile();
+
+ Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
+
+ // add, commit, push
+ this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
+ this.gitForPush.commit().setMessage("Edge commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if edge1 has been added to the FlowGraph
+ Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 1);
+ FlowEdge flowEdge = edgeSet.iterator().next();
+ Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
+ Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+ }
+
+ @Test (dependsOnMethods = "testAddNode")
+ public void testUpdateEdge()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ //Update edge1 file
+ Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + "=FS:///test-template/flow.conf\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n"
+ + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n"
+ + "key1=value1\n", edge1File, Charsets.UTF_8);
+
+ // add, commit, push
+ this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call();
+ this.gitForPush.commit().setMessage("Edge commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if new edge1 has been added to the FlowGraph
+ Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 1);
+ FlowEdge flowEdge = edgeSet.iterator().next();
+ Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
+ Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1");
+ Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2");
+ Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor");
+ Assert.assertEquals(flowEdge.getProps().getString("key1"), "value1");
+ }
+
+ @Test (dependsOnMethods = "testUpdateEdge")
+ public void testUpdateNode()
+ throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException {
+ //Update param1 value in node1 and check if updated node is added to the graph
+ testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
+ }
+
+
+ @Test (dependsOnMethods = "testUpdateNode")
+ public void testRemoveEdge() throws GitAPIException, IOException {
+ // delete a config file
+ edge1File.delete();
+
+ //Node1 has 1 edge before delete
+ Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertEquals(edgeSet.size(), 1);
+
+ // delete, commit, push
+ DirCache ac = this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
+ this.edge1Dir.getName(), this.edge1File.getName())).call();
+ RevCommit cc = this.gitForPush.commit().setMessage("Edge remove commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if edge1 has been deleted from the graph
+ edgeSet = this.flowGraph.getEdges("node1");
+ Assert.assertTrue(edgeSet.size() == 0);
+ }
+
+ @Test (dependsOnMethods = "testRemoveEdge")
+ public void testRemoveNode() throws GitAPIException, IOException {
+ //delete node file
+ node1File.delete();
+
+ //node1 is present in the graph before delete
+ DataNode node1 = this.flowGraph.getNode("node1");
+ Assert.assertNotNull(node1);
+
+ // delete, commit, push
+ DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call();
+ RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call();
+ this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+ this.gitFlowGraphMonitor.processGitConfigChanges();
+
+ //Check if node1 has been deleted from the graph
+ node1 = this.flowGraph.getNode("node1");
+ Assert.assertNull(node1);
+ }
+
+
+ private void cleanUpDir(String dir) {
+ File specStoreDir = new File(dir);
+
+ // cleanup is flaky on Travis, so retry a few times and then suppress the error if unsuccessful
+ for (int i = 0; i < 5; i++) {
+ try {
+ if (specStoreDir.exists()) {
+ FileUtils.deleteDirectory(specStoreDir);
+ }
+ // if delete succeeded then break out of loop
+ break;
+ } catch (IOException e) {
+ logger.warn("Cleanup delete directory failed for directory: " + dir, e);
+ }
+ }
+ }
+
+ private String formNodeFilePath(String groupDir, String fileName) {
+ return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
+ }
+
+ private String formEdgeFilePath(String parentDir, String groupDir, String fileName) {
+ return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName;
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ cleanUpDir(TEST_DIR);
+ }
+}
\ No newline at end of file
[2/2] incubator-gobblin git commit: [GOBBLIN-505] Implement a
Git-based FlowGraph Monitor.
Posted by hu...@apache.org.
[GOBBLIN-505] Implement a Git-based FlowGraph Monitor.
Closes #2382 from sv2000/gitGraphMonitor
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6b120185
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6b120185
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6b120185
Branch: refs/heads/master
Commit: 6b1201852aa71b904da5f8a49a550e8596fe59fc
Parents: 2c5e25d
Author: suvasude <su...@linkedin.biz>
Authored: Mon Jun 11 14:23:21 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Jun 11 14:23:21 2018 -0700
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 25 +-
.../service/modules/core/GitConfigMonitor.java | 325 ++----------------
.../modules/core/GitFlowGraphMonitor.java | 309 ++++++++++++++++++
.../modules/core/GitMonitoringService.java | 326 +++++++++++++++++++
.../service/modules/flowgraph/BaseFlowEdge.java | 51 +--
.../modules/flowgraph/FlowEdgeFactory.java | 13 +-
.../flowgraph/FlowGraphConfigurationKeys.java | 7 +-
.../modules/core/GitConfigMonitorTest.java | 21 +-
.../modules/core/GobblinServiceManagerTest.java | 14 +-
.../modules/flowgraph/BaseFlowGraphTest.java | 46 +--
.../modules/core/GitFlowGraphMonitorTest.java | 314 ++++++++++++++++++
11 files changed, 1059 insertions(+), 392 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b6196da..50e6020 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -103,7 +103,7 @@ public class ConfigurationKeys {
public static final String JOB_CONFIG_FILE_DIR_KEY = "jobconf.dir";
// Path where all job configuration files stored
- public static final String JOB_CONFIG_FILE_GENERAL_PATH_KEY = "jobconf.fullyQualifiedPath" ;
+ public static final String JOB_CONFIG_FILE_GENERAL_PATH_KEY = "jobconf.fullyQualifiedPath";
// Job configuration file extensions
public static final String JOB_CONFIG_FILE_EXTENSIONS_KEY = "jobconf.extensions";
public static final String DEFAULT_JOB_CONFIG_FILE_EXTENSIONS = "pull,job";
@@ -208,13 +208,13 @@ public class ConfigurationKeys {
public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1);
/** Optional, for user to specified which template to use, inside .job file */
- public static final String JOB_TEMPLATE_PATH = "job.template" ;
+ public static final String JOB_TEMPLATE_PATH = "job.template";
/**
* Configuration property used only for job configuration file's tempalte, inside .template file
*/
public static final String REQUIRED_ATRRIBUTES_LIST = "gobblin.template.required_attributes";
- public static final String JOB_DEPENDENCIES="dependencies";
+ public static final String JOB_DEPENDENCIES = "dependencies";
/**
* Configuration for emitting job events
@@ -250,7 +250,7 @@ public class ConfigurationKeys {
*/
// This property is used to specify the URN of a dataset a job or WorkUnit extracts data for
public static final String DATASET_URN_KEY = "dataset.urn";
- public static final String GLOBAL_WATERMARK_DATASET_URN="__globalDatasetWatermark";
+ public static final String GLOBAL_WATERMARK_DATASET_URN = "__globalDatasetWatermark";
public static final String DEFAULT_DATASET_URN = "";
/**
@@ -900,16 +900,11 @@ public class ConfigurationKeys {
public static final String CONFIG_BASED_PREFIX = "gobblin.configBased";
/**
- * Configuration related to the git flow config monitoring service
+ * Configuration related to the Git based monitoring service
*/
- public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor.";
- public static final String GIT_CONFIG_MONITOR_REPO_URI = GIT_CONFIG_MONITOR_PREFIX + "repositoryUri";
- public static final String GIT_CONFIG_MONITOR_REPO_DIR = GIT_CONFIG_MONITOR_PREFIX + "repositoryDirectory";
- public static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config";
- public static final String GIT_CONFIG_MONITOR_CONFIG_DIR = GIT_CONFIG_MONITOR_PREFIX + "configDirectory";
- public static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config";
- public static final String GIT_CONFIG_MONITOR_POLLING_INTERVAL = GIT_CONFIG_MONITOR_PREFIX + "pollingInterval";
- public static final String GIT_CONFIG_MONITOR_BRANCH_NAME = GIT_CONFIG_MONITOR_PREFIX + "branchName";
- public static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master";
- public static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
+ public static final String GIT_MONITOR_REPO_URI = "repositoryUri";
+ public static final String GIT_MONITOR_REPO_DIR = "repositoryDirectory";
+ public static final String GIT_MONITOR_CONFIG_BASE_DIR = "configBaseDirectory";
+ public static final String GIT_MONITOR_POLLING_INTERVAL = "pollingInterval";
+ public static final String GIT_MONITOR_BRANCH_NAME = "branchName";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 09d7bb4..9fd0555 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -16,34 +16,13 @@
*/
package org.apache.gobblin.service.modules.core;
-import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.api.ResetCommand;
-import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.diff.DiffEntry;
-import org.eclipse.jgit.errors.RepositoryNotFoundException;
-import org.eclipse.jgit.lib.ObjectId;
-import org.eclipse.jgit.lib.ObjectReader;
-import org.eclipse.jgit.revwalk.RevCommit;
-import org.eclipse.jgit.treewalk.CanonicalTreeParser;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
@@ -55,11 +34,8 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PullFileLoader;
-
/**
* Service that monitors for jobs from a git repository.
* The git repository must have an inital commit that has no config files since that is used as a base for getting
@@ -69,148 +45,54 @@ import org.apache.gobblin.util.PullFileLoader;
* The <flowGroup> and <flowName> is used to generate the URI used to store the config in the {@link FlowCatalog}
*/
@Slf4j
-public class GitConfigMonitor extends AbstractIdleService {
+public class GitConfigMonitor extends GitMonitoringService {
+ public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor";
+
private static final String SPEC_DESCRIPTION = "Git-based flow config";
private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
- private static final int TERMINATION_TIMEOUT = 30;
+ private static final String PROPERTIES_EXTENSIONS = "pull,job";
+ private static final String CONF_EXTENSIONS = "json,conf";
+ private static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config";
+ private static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config";
+ private static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master";
+
private static final int CONFIG_FILE_DEPTH = 3;
- private static final String REMOTE_NAME = "origin";
+ private static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
+
+ private static final Config DEFAULT_FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
+ .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
+ .put(GitMonitoringService.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+ .put(GitMonitoringService.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+ .build());
- private final ScheduledExecutorService scheduledExecutor;
- private final GitRepository gitRepo;
- private final int pollingInterval;
- private final String repositoryDir;
- private final String configDir;
- private final Path configDirPath;
private final FlowCatalog flowCatalog;
- private final PullFileLoader pullFileLoader;
private final Config emptyConfig = ConfigFactory.empty();
- private volatile boolean isActive = false;
- /**
- * Create a {@link GitConfigMonitor} that monitors a git repository for changes and manages config in a
- * {@link FlowCatalog}
- * @param config configuration
- * @param flowCatalog the flow catalog
- */
GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
+ super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
this.flowCatalog = flowCatalog;
-
- this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
- ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
-
- Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI),
- ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI + " needs to be specified.");
-
- String repositoryUri = config.getString(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI);
-
- this.repositoryDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR,
- ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR);
-
- this.configDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_CONFIG_DIR,
- ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR);
-
- this.pollingInterval = ConfigUtils.getInt(config, ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL,
- ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL);
-
- String branchName = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_BRANCH_NAME,
- ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME);
-
- this.configDirPath = new Path(this.repositoryDir, this.configDir);
-
- try {
- this.pullFileLoader = new PullFileLoader(this.configDirPath,
- FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()),
- PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS, PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
- } catch (IOException e) {
- throw new RuntimeException("Could not create pull file loader", e);
- }
-
- try {
- this.gitRepo = new GitRepository(repositoryUri, this.repositoryDir, branchName);
- } catch (GitAPIException | IOException e) {
- throw new RuntimeException("Could not open git repository", e);
- }
- }
-
- /** Start the service. */
- @Override
- protected void startUp() throws Exception {
- log.info("Starting the " + GitConfigMonitor.class.getSimpleName());
- log.info("Polling git with inteval {} ", this.pollingInterval);
-
- // Schedule the job config fetch task
- this.scheduledExecutor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- processGitConfigChanges();
- } catch (GitAPIException | IOException e) {
- log.error("Failed to process git config changes", e);
- // next run will try again since errors could be intermittent
- }
- }
- }, 0, this.pollingInterval, TimeUnit.SECONDS);
}
- /** Stop the service. */
@Override
- protected void shutDown() throws Exception {
- this.scheduledExecutor.shutdown();
- this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
- }
-
- public synchronized void setActive(boolean isActive) {
- if (this.isActive == isActive) {
- // No-op if already in correct state
- return;
- }
-
- this.isActive = isActive;
- }
-
- /**
- * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog}
- * @throws GitAPIException
- * @throws IOException
- */
- @VisibleForTesting
- void processGitConfigChanges() throws GitAPIException, IOException {
+ public boolean shouldPollGit() {
// if not active or if the flow catalog is not up yet then can't process config changes
if (!isActive || !this.flowCatalog.isRunning()) {
- log.info("GitConfigMonitor: skip poll since the JobCatalog is not yet running.");
- return;
- }
-
- List<DiffEntry> changes = this.gitRepo.getChanges();
-
- for (DiffEntry change : changes) {
- switch (change.getChangeType()) {
- case ADD:
- case MODIFY:
- addSpec(change);
- break;
- case DELETE:
- removeSpec(change);
- break;
- case RENAME:
- removeSpec(change);
- addSpec(change);
- break;
- default:
- throw new RuntimeException("Unsupported change type " + change.getChangeType());
- }
+ log.warn("GitConfigMonitor: skip poll since the JobCatalog is not yet running. isActive = {}", this.isActive);
+ return false;
}
-
- // Done processing changes, so checkpoint
- this.gitRepo.moveCheckpointAndHashesForward();
+ return true;
}
/**
* Add a {@link FlowSpec} for an added, updated, or modified flow config
* @param change
*/
- private void addSpec(DiffEntry change) {
+ @Override
+ public void addChange(DiffEntry change) {
if (checkConfigFilePath(change.getNewPath())) {
Path configFilePath = new Path(this.repositoryDir, change.getNewPath());
@@ -232,7 +114,8 @@ public class GitConfigMonitor extends AbstractIdleService {
* remove a {@link FlowSpec} for a deleted or renamed flow config
* @param change
*/
- private void removeSpec(DiffEntry change) {
+ @Override
+ public void removeChange(DiffEntry change) {
if (checkConfigFilePath(change.getOldPath())) {
Path configFilePath = new Path(this.repositoryDir, change.getOldPath());
String flowName = FSSpecStore.getSpecName(configFilePath);
@@ -265,10 +148,10 @@ public class GitConfigMonitor extends AbstractIdleService {
Path configFile = new Path(configFilePath);
String fileExtension = Files.getFileExtension(configFile.getName());
- if (configFile.depth() != CONFIG_FILE_DEPTH ||
- !configFile.getParent().getParent().getName().equals(configDir) ||
- !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension) ||
- PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) {
+ if (configFile.depth() != CONFIG_FILE_DEPTH
+ || !configFile.getParent().getParent().getName().equals(folderName)
+ || !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension)
+ || PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) {
log.warn("Changed file does not conform to directory structure and file name format, skipping: "
+ configFilePath);
@@ -292,144 +175,4 @@ public class GitConfigMonitor extends AbstractIdleService {
return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup));
}
-
- /**
- * Class for managing a git repository
- */
- private static class GitRepository {
- private final static String CHECKPOINT_FILE = "checkpoint.txt";
- private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp";
- private final String repoUri;
- private final String repoDir;
- private final String branchName;
- private Git git;
- private String lastProcessedGitHash;
- private String latestGitHash;
-
- /**
- * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir
- * @param repoUri URI of repository
- * @param repoDir Directory to hold the local copy of the repository
- * @param branchName Branch name
- * @throws GitAPIException
- * @throws IOException
- */
- private GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException {
- this.repoUri = repoUri;
- this.repoDir = repoDir;
- this.branchName = branchName;
-
- initRepository();
- }
-
- /**
- * Open the repository if it exists locally, otherwise clone it
- * @throws GitAPIException
- * @throws IOException
- */
- private void initRepository() throws GitAPIException, IOException {
- File repoDirFile = new File(this.repoDir);
-
- try {
- this.git = Git.open(repoDirFile);
-
- String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url");
-
- if (!uri.equals(this.repoUri)) {
- throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri);
- }
- } catch (RepositoryNotFoundException e) {
- // if the repository was not found then clone a new one
- this.git = Git.cloneRepository()
- .setDirectory(repoDirFile)
- .setURI(this.repoUri)
- .setBranch(this.branchName)
- .call();
- }
-
- try {
- this.lastProcessedGitHash = readCheckpoint();
- } catch (FileNotFoundException e) {
- // if no checkpoint is available then start with the first commit
- Iterable<RevCommit> logs = git.log().call();
- RevCommit lastLog = null;
-
- for (RevCommit log : logs) {
- lastLog = log;
- }
-
- if (lastLog != null) {
- this.lastProcessedGitHash = lastLog.getName();
- }
- }
-
- this.latestGitHash = this.lastProcessedGitHash;
- }
-
- /**
- * Read the last processed commit githash from the checkpoint file
- * @return
- * @throws IOException
- */
- private String readCheckpoint() throws IOException {
- File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
- return Files.toString(checkpointFile, Charsets.UTF_8);
- }
-
- /**
- * Write the last processed commit githash to the checkpoint file
- * @param gitHash
- * @throws IOException
- */
- private void writeCheckpoint(String gitHash) throws IOException {
- // write to a temporary name then rename to make the operation atomic when the file system allows a file to be
- // replaced
- File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP);
- File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
-
- Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8);
-
- Files.move(tmpCheckpointFile, checkpointFile);
- }
-
- private void moveCheckpointAndHashesForward() throws IOException {
- this.lastProcessedGitHash = this.latestGitHash;
-
- writeCheckpoint(this.latestGitHash);
- }
-
- /**
- *
- * @throws GitAPIException
- * @throws IOException
- */
- private List<DiffEntry> getChanges() throws GitAPIException, IOException {
- // get tree for last processed commit
- ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}");
-
- // refresh to latest and reset hard to handle forced pushes
- this.git.fetch().setRemote(REMOTE_NAME).call();
- // reset hard to get a clean working set since pull --rebase may leave files around
- this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call();
-
- ObjectId head = this.git.getRepository().resolve("HEAD");
- ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}");
-
- // remember the hash for the current HEAD. This will be checkpointed after the diff is processed.
- latestGitHash = head.getName();
-
- // diff old and new heads to find changes
- ObjectReader reader = this.git.getRepository().newObjectReader();
- CanonicalTreeParser oldTreeIter = new CanonicalTreeParser();
- oldTreeIter.reset(reader, oldHeadTree);
- CanonicalTreeParser newTreeIter = new CanonicalTreeParser();
- newTreeIter.reset(reader, headTree);
-
- return this.git.diff()
- .setNewTree(newTreeIter)
- .setOldTree(oldTreeIter)
- .setShowNameAndStatusOnly(true)
- .call();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
new file mode 100644
index 0000000..c7dd226
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.diff.DiffEntry;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Service that monitors for changes to {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
+ * The git repository must have an inital commit that has no files since that is used as a base for getting
+ * the change list.
+ * The {@link DataNode}s and {@link FlowEdge}s in FlowGraph need to be organized with the following directory structure on git:
+ * <root_flowGraph_dir>/<nodeName>/<nodeName>.properties
+ * <root_flowGraph_dir>/<nodeName1>/<nodeName2>/<edgeName>.properties
+ */
+@Slf4j
+public class GitFlowGraphMonitor extends GitMonitoringService {
+ public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = "gitFlowGraphMonitor";
+
+ private static final String PROPERTIES_EXTENSIONS = "properties";
+ private static final String CONF_EXTENSIONS = StringUtils.EMPTY;
+ private static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
+ private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = "git-flowgraph";
+ private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph";
+ private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = "master";
+
+ private static final int NODE_FILE_DEPTH = 3;
+ private static final int EDGE_FILE_DEPTH = 4;
+ private static final int DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL = 60;
+
+ private static final Config DEFAULT_FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
+ .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
+ .put(JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+ .put(HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+ .build());
+
+ private FSFlowCatalog flowCatalog;
+ private FlowGraph flowGraph;
+ private final Config emptyConfig = ConfigFactory.empty();
+
+ public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph) {
+ super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+ this.flowCatalog = flowCatalog;
+ this.flowGraph = graph;
+ }
+
+ /**
+ * Determine if the service should poll Git.
+ */
+ @Override
+ public boolean shouldPollGit() {
+ return this.isActive;
+ }
+
+ /**
+ * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
+ * the {@link FlowGraph} for an added, updated or modified node or edge file.
+ * @param change
+ */
+ @Override
+ public void addChange(DiffEntry change) {
+ Path path = new Path(change.getNewPath());
+ if (path.depth() == NODE_FILE_DEPTH) {
+ addDataNode(change);
+ } else if (path.depth() == EDGE_FILE_DEPTH) {
+ addFlowEdge(change);
+ }
+ }
+
+ /**
+ * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge} from the {@link FlowGraph} for
+ * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
+ * @param change
+ */
+ @Override
+ public void removeChange(DiffEntry change) {
+ Path path = new Path(change.getOldPath());
+ if (path.depth() == NODE_FILE_DEPTH) {
+ removeDataNode(change);
+ } else if (path.depth() == EDGE_FILE_DEPTH) {
+ removeFlowEdge(change);
+ }
+ }
+
+ /**
+ * Add a {@link DataNode} to the {@link FlowGraph}. The method uses the {@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
+ * to instantiate a {@link DataNode} from the node config file.
+ * @param change
+ */
+ private void addDataNode(DiffEntry change) {
+ if (checkFilePath(change.getNewPath(), NODE_FILE_DEPTH)) {
+ Path nodeFilePath = new Path(this.repositoryDir, change.getNewPath());
+ try {
+ Config config = loadNodeFileWithOverrides(nodeFilePath);
+ Class dataNodeClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.DATA_NODE_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+ DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config);
+ if (!this.flowGraph.addDataNode(dataNode)) {
+ log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId());
+ }
+ } catch (Exception e) {
+ log.warn("Could not add DataNode defined in {} due to exception {}", change.getNewPath(), e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Remove a {@link DataNode} from the {@link FlowGraph}. The method extracts the nodeId of the
+ * {@link DataNode} from the node config file and uses it to delete the associated {@link DataNode}.
+ * @param change
+ */
+ private void removeDataNode(DiffEntry change) {
+ if (checkFilePath(change.getOldPath(), NODE_FILE_DEPTH)) {
+ Path nodeFilePath = new Path(this.repositoryDir, change.getOldPath());
+ Config config = getNodeConfigWithOverrides(ConfigFactory.empty(), nodeFilePath);
+ String nodeId = config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY);
+ if (!this.flowGraph.deleteDataNode(nodeId)) {
+ log.warn("Could not remove DataNode {} from FlowGraph; skipping", nodeId);
+ }
+ }
+ }
+
+ /**
+ * Add a {@link FlowEdge} to the {@link FlowGraph}. The method uses the {@link FlowEdgeFactory} instance
+ * provided by the {@link FlowGraph} to build a {@link FlowEdge} from the edge config file.
+ * @param change
+ */
+ private void addFlowEdge(DiffEntry change) {
+ if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) {
+ Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath());
+ try {
+ Config config = loadEdgeFileWithOverrides(edgeFilePath);
+ Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
+ FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
+ FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config);
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog);
+ if (!this.flowGraph.addFlowEdge(edge)) {
+ log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId());
+ }
+ } catch (Exception e) {
+ log.warn("Could not add edge defined in {} due to exception {}", change.getNewPath(), e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Remove a {@link FlowEdge} from the {@link FlowGraph}. The method uses {@link FlowEdgeFactory}
+ * to construct the edgeId of the {@link FlowEdge} from the config file and uses it to delete the associated
+ * {@link FlowEdge}.
+ * @param change
+ */
+ private void removeFlowEdge(DiffEntry change) {
+ if (checkFilePath(change.getOldPath(), EDGE_FILE_DEPTH)) {
+ Path edgeFilePath = new Path(this.repositoryDir, change.getOldPath());
+ try {
+ Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), edgeFilePath);
+ String edgeId = config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
+ if (!this.flowGraph.deleteFlowEdge(edgeId)) {
+ log.warn("Could not remove FlowEdge {} from FlowGraph; skipping", edgeId);
+ }
+ } catch (Exception e) {
+ log.warn("Could not remove edge defined in {} due to exception {}", edgeFilePath, e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * check whether the file has the proper naming and hierarchy
+ * @param file the relative path from the repo root
+ * @return false if the file does not conform
+ */
+ private boolean checkFilePath(String file, int depth) {
+ // The file is either a node file or an edge file and needs to be stored at either:
+ // flowGraphDir/nodeName/nodeName.properties (if it is a node file), or
+ // flowGraphDir/nodeName/nodeName/edgeName.properties (if it is an edge file)
+
+ Path filePath = new Path(file);
+ String fileExtension = Files.getFileExtension(filePath.getName());
+ if (filePath.depth() != depth || !checkFileLevelRelativeToRoot(filePath, depth)
+ || !(this.javaPropsExtensions.contains(fileExtension))) {
+ log.warn("Changed file does not conform to directory structure and file name format, skipping: "
+ + filePath);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Helper to check if a file has proper hierarchy.
+ * @param filePath path of the node/edge file
+ * @param depth expected depth of the file
+ * @return true if the file conforms to the expected hierarchy
+ */
+ private boolean checkFileLevelRelativeToRoot(Path filePath, int depth) {
+ if(filePath == null) {
+ return false;
+ }
+ Path path = filePath;
+ for (int i = 0; i < depth - 1; i++) {
+ path = path.getParent();
+ }
+ if (!path.getName().equals(folderName)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Helper that overrides the data.node.id property with name derived from the node file path
+ * @param nodeConfig node config
+ * @param nodeFilePath path of the node file
+ * @return config with overridden data.node.id
+ */
+ private Config getNodeConfigWithOverrides(Config nodeConfig, Path nodeFilePath) {
+ String nodeId = nodeFilePath.getParent().getName();
+ return nodeConfig.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef(nodeId));
+ }
+
+ /**
+ * Helper that overrides the flow edge properties with name derived from the edge file path
+ * @param edgeConfig edge config
+ * @param edgeFilePath path of the edge file
+ * @return config with overridden edge properties
+ */
+ private Config getEdgeConfigWithOverrides(Config edgeConfig, Path edgeFilePath) {
+ String source = edgeFilePath.getParent().getParent().getName();
+ String destination = edgeFilePath.getParent().getName();
+ String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName());
+ return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source))
+ .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination))
+ .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName)));
+ }
+
+ /**
+ * Load the node file.
+ * @param filePath path of the node file relative to the repository root
+ * @return the configuration object
+ * @throws IOException
+ */
+ private Config loadNodeFileWithOverrides(Path filePath) throws IOException {
+ Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false);
+ return getNodeConfigWithOverrides(nodeConfig, filePath);
+ }
+
+ /**
+ * Load the edge file.
+ * @param filePath path of the edge file relative to the repository root
+ * @return the configuration object
+ * @throws IOException
+ */
+ private Config loadEdgeFileWithOverrides(Path filePath) throws IOException {
+ Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false);
+ return getEdgeConfigWithOverrides(edgeConfig, filePath);
+ }
+
+ /**
+ * Get an edge label from the edge properties
+ * @param source source data node id
+ * @param destination destination data node id
+ * @param edgeName simple name of the edge (e.g. file name without extension of the edge file)
+ * @return a string label identifying the edge
+ */
+ private String getEdgeId(String source, String destination, String edgeName) {
+ return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(source, destination, edgeName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
new file mode 100644
index 0000000..2361edc
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.ResetCommand;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.diff.DiffEntry;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.ObjectReader;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.treewalk.CanonicalTreeParser;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.PullFileLoader;
+
+
+@Slf4j
+public abstract class GitMonitoringService extends AbstractIdleService {
+ private static final String REMOTE_NAME = "origin";
+ private static final int TERMINATION_TIMEOUT = 30;
+
+ public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
+ public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+
+ private Integer pollingInterval;
+ protected final ScheduledExecutorService scheduledExecutor;
+ protected GitMonitoringService.GitRepository gitRepo;
+ protected String repositoryDir;
+ protected String folderName;
+ protected Path folderPath;
+ protected final PullFileLoader pullFileLoader;
+ protected final Set<String> javaPropsExtensions;
+ protected final Set<String> hoconFileExtensions;
+ protected volatile boolean isActive = false;
+
+ public GitMonitoringService(Config config) {
+ Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_MONITOR_REPO_URI),
+ ConfigurationKeys.GIT_MONITOR_REPO_URI + " needs to be specified.");
+
+ String repositoryUri = config.getString(ConfigurationKeys.GIT_MONITOR_REPO_URI);
+ this.repositoryDir = config.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR);
+ String branchName = config.getString(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME);
+ this.pollingInterval = config.getInt(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL);
+ this.folderName = config.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR);
+
+ try {
+ this.gitRepo = new GitMonitoringService.GitRepository(repositoryUri, repositoryDir, branchName);
+ } catch (GitAPIException | IOException e) {
+ throw new RuntimeException("Could not open git repository", e);
+ }
+
+ this.folderPath = new Path(this.repositoryDir, this.folderName);
+ this.javaPropsExtensions = Sets.newHashSet(config.getString(JAVA_PROPS_EXTENSIONS).split(","));
+ this.hoconFileExtensions = Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
+ try {
+ this.pullFileLoader = new PullFileLoader(this.folderPath,
+ FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()),
+ this.javaPropsExtensions, this.hoconFileExtensions);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create pull file loader", e);
+ }
+
+ this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor")));
+ }
+
+ synchronized void setActive(boolean isActive) {
+ if (this.isActive == isActive) {
+ // No-op if already in correct state
+ return;
+ }
+
+ this.isActive = isActive;
+ }
+
+
+ /** Start the service. */
+ @Override
+ protected void startUp() throws Exception {
+ log.info("Starting the " + GitConfigMonitor.class.getSimpleName());
+ log.info("Polling git with inteval {} ", this.pollingInterval);
+
+ // Schedule the job config fetch task
+ this.scheduledExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (shouldPollGit()) {
+ processGitConfigChanges();
+ }
+ } catch (GitAPIException | IOException e) {
+ log.error("Failed to process git config changes", e);
+ // next run will try again since errors could be intermittent
+ }
+ }
+ }, 0, this.pollingInterval, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog}
+ * @throws GitAPIException
+ * @throws IOException
+ */
+ @VisibleForTesting
+ public void processGitConfigChanges() throws GitAPIException, IOException {
+ List<DiffEntry> changes = this.gitRepo.getChanges();
+
+ for (DiffEntry change : changes) {
+ switch (change.getChangeType()) {
+ case ADD:
+ case MODIFY:
+ addChange(change);
+ break;
+ case DELETE:
+ removeChange(change);
+ break;
+ case RENAME:
+ removeChange(change);
+ addChange(change);
+ break;
+ default:
+ throw new RuntimeException("Unsupported change type " + change.getChangeType());
+ }
+ }
+
+ // Done processing changes, so checkpoint
+ this.gitRepo.moveCheckpointAndHashesForward();
+ }
+
+ /** Stop the service. */
+ @Override
+ protected void shutDown() throws Exception {
+ this.scheduledExecutor.shutdown();
+ this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Class for managing a git repository
+ */
+ static class GitRepository {
+ private final static String CHECKPOINT_FILE = "checkpoint.txt";
+ private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp";
+ private final String repoUri;
+ private final String repoDir;
+ private final String branchName;
+ private Git git;
+ private String lastProcessedGitHash;
+ private String latestGitHash;
+
+ /**
+ * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir
+ * @param repoUri URI of repository
+ * @param repoDir Directory to hold the local copy of the repository
+ * @param branchName Branch name
+ * @throws GitAPIException
+ * @throws IOException
+ */
+ GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException {
+ this.repoUri = repoUri;
+ this.repoDir = repoDir;
+ this.branchName = branchName;
+
+ initRepository();
+ }
+
+ /**
+ * Open the repository if it exists locally, otherwise clone it
+ * @throws GitAPIException
+ * @throws IOException
+ */
+ private void initRepository() throws GitAPIException, IOException {
+ File repoDirFile = new File(this.repoDir);
+
+ try {
+ this.git = Git.open(repoDirFile);
+
+ String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url");
+
+ if (!uri.equals(this.repoUri)) {
+ throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri);
+ }
+ } catch (RepositoryNotFoundException e) {
+ // if the repository was not found then clone a new one
+ this.git = Git.cloneRepository()
+ .setDirectory(repoDirFile)
+ .setURI(this.repoUri)
+ .setBranch(this.branchName)
+ .call();
+ }
+
+ try {
+ this.lastProcessedGitHash = readCheckpoint();
+ } catch (FileNotFoundException e) {
+ // if no checkpoint is available then start with the first commit
+ Iterable<RevCommit> logs = git.log().call();
+ RevCommit lastLog = null;
+
+ for (RevCommit log : logs) {
+ lastLog = log;
+ }
+
+ if (lastLog != null) {
+ this.lastProcessedGitHash = lastLog.getName();
+ }
+ }
+
+ this.latestGitHash = this.lastProcessedGitHash;
+ }
+
+ /**
+ * Read the last processed commit githash from the checkpoint file
+ * @return
+ * @throws IOException
+ */
+ private String readCheckpoint() throws IOException {
+ File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
+ return Files.toString(checkpointFile, Charsets.UTF_8);
+ }
+
+ /**
+ * Write the last processed commit githash to the checkpoint file
+ * @param gitHash
+ * @throws IOException
+ */
+ private void writeCheckpoint(String gitHash) throws IOException {
+ // write to a temporary name then rename to make the operation atomic when the file system allows a file to be
+ // replaced
+ File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP);
+ File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE);
+
+ Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8);
+
+ Files.move(tmpCheckpointFile, checkpointFile);
+ }
+
+ void moveCheckpointAndHashesForward() throws IOException {
+ this.lastProcessedGitHash = this.latestGitHash;
+
+ writeCheckpoint(this.latestGitHash);
+ }
+
+ /**
+ *
+ * @throws GitAPIException
+ * @throws IOException
+ */
+ List<DiffEntry> getChanges() throws GitAPIException, IOException {
+ // get tree for last processed commit
+ ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}");
+
+ // refresh to latest and reset hard to handle forced pushes
+ this.git.fetch().setRemote(REMOTE_NAME).call();
+ // reset hard to get a clean working set since pull --rebase may leave files around
+ this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call();
+
+ ObjectId head = this.git.getRepository().resolve("HEAD");
+ ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}");
+
+ // remember the hash for the current HEAD. This will be checkpointed after the diff is processed.
+ latestGitHash = head.getName();
+
+ // diff old and new heads to find changes
+ ObjectReader reader = this.git.getRepository().newObjectReader();
+ CanonicalTreeParser oldTreeIter = new CanonicalTreeParser();
+ oldTreeIter.reset(reader, oldHeadTree);
+ CanonicalTreeParser newTreeIter = new CanonicalTreeParser();
+ newTreeIter.reset(reader, headTree);
+
+ return this.git.diff()
+ .setNewTree(newTreeIter)
+ .setOldTree(oldTreeIter)
+ .setShowNameAndStatusOnly(true)
+ .call();
+ }
+ }
+
+ public abstract boolean shouldPollGit();
+
+ public abstract void addChange(DiffEntry change);
+
+ public abstract void removeChange(DiffEntry change);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 12b6222..fc82cc1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,19 +17,19 @@
package org.apache.gobblin.service.modules.flowgraph;
-import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.security.UserGroupInformation;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
@@ -37,17 +37,12 @@ import org.apache.gobblin.service.modules.template.FlowTemplate;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.ConfigUtils;
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
/**
* An implementation of {@link FlowEdge}.
*/
@Alpha
public class BaseFlowEdge implements FlowEdge {
- public static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":";
-
@Getter
protected List<String> endPoints;
@@ -67,13 +62,13 @@ public class BaseFlowEdge implements FlowEdge {
private boolean active;
//Constructor
- public BaseFlowEdge(List<String> endPoints, String edgeName, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
+ public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
this.endPoints = endPoints;
this.flowTemplate = flowTemplate;
this.executors = executors;
this.active = active;
this.props = properties;
- this.id = generateEdgeId(endPoints, edgeName);
+ this.id = edgeId;
}
@Override
@@ -81,10 +76,6 @@ public class BaseFlowEdge implements FlowEdge {
return true;
}
- @VisibleForTesting
- protected static String generateEdgeId(List<String> endPoints, String edgeName) {
- return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(endPoints.get(0), endPoints.get(1), edgeName);
- }
/**
* The {@link FlowEdge}s are the same if they have the same endpoints and both refer to the same {@FlowTemplate} i.e.
* the {@link FlowTemplate} uris are the same
@@ -100,11 +91,11 @@ public class BaseFlowEdge implements FlowEdge {
FlowEdge that = (FlowEdge) o;
- if(!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
+ if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) {
return false;
}
- if(!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) {
+ if (!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) {
return false;
}
return true;
@@ -135,17 +126,17 @@ public class BaseFlowEdge implements FlowEdge {
@Override
public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException {
try {
- String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
+ String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
- String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
+ String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "");
Preconditions.checkArgument(!Strings.isNullOrEmpty(destination), "A FlowEdge must have a non-null or empty destination");
List<String> endPoints = Lists.newArrayList(source, destination);
- String edgeName = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY,"");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
+ String edgeId = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeId), "A FlowEdge must have a non-null or empty Id");
List<Config> specExecutorConfigList = new ArrayList<>();
boolean flag;
- for(int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)) != false; i++) {
+ for (int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); i++) {
specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i));
}
@@ -168,26 +159,12 @@ public class BaseFlowEdge implements FlowEdge {
specExecutors.add(executor);
}
FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri));
- return new BaseFlowEdge(endPoints, edgeName, flowTemplate, specExecutors, edgeProps, isActive);
- } catch(RuntimeException e) {
+ return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive);
+ } catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new FlowEdgeCreationException(e);
}
}
-
- @Override
- public String getEdgeId(Config edgeProps) throws IOException {
- String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,"");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source");
- String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,"");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty destination");
- String edgeName =
- ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, "");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name");
- List<String> endPoints = Lists.newArrayList(source, destination);
-
- return generateEdgeId(endPoints, edgeName);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
index 851e887..2977231 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java
@@ -17,13 +17,9 @@
package org.apache.gobblin.service.modules.flowgraph;
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
-
import com.typesafe.config.Config;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
public interface FlowEdgeFactory {
/**
@@ -36,13 +32,6 @@ public interface FlowEdgeFactory {
*/
public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException;
- /**
- * Get an edge label from the edge properties
- * @param edgeProps properties of the edge
- * @return a string label identifying the edge
- */
- public String getEdgeId(Config edgeProps) throws IOException;
-
public class FlowEdgeCreationException extends Exception {
private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
index 0d94e3f..cd4876a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java
@@ -24,17 +24,22 @@ public class FlowGraphConfigurationKeys {
/**
* {@link DataNode} configuration keys.
*/
+ public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class";
+ public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode";
public static final String DATA_NODE_ID_KEY = DATA_NODE_PREFIX + "id";
public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive";
/**
* {@link FlowEdge} configuration keys.
*/
+ public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class";
+ public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory";
public static final String FLOW_EDGE_SOURCE_KEY = FLOW_EDGE_PREFIX + "source";
public static final String FLOW_EDGE_DESTINATION_KEY = FLOW_EDGE_PREFIX + "destination";
+ public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id";
public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name";
public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive";
public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri";
- public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX +"specExecutors";
+ public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors";
public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
index 6cd8bf2..71ad56d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -93,10 +93,11 @@ public class GitConfigMonitorTest {
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
this.config = ConfigBuilder.create()
- .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath())
- .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
+ .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI,
+ this.remoteRepo.getDirectory().getAbsolutePath())
+ .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
.addPrimitive(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog")
- .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5)
+ .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
.build();
this.flowCatalog = new FlowCatalog(config);
@@ -153,7 +154,7 @@ public class GitConfigMonitorTest {
Collection<Spec> specs = this.flowCatalog.getSpecs();
Assert.assertTrue(specs.size() == 1);
- FlowSpec spec = (FlowSpec)(specs.iterator().next());
+ FlowSpec spec = (FlowSpec) (specs.iterator().next());
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -176,7 +177,7 @@ public class GitConfigMonitorTest {
Collection<Spec> specs = this.flowCatalog.getSpecs();
Assert.assertTrue(specs.size() == 1);
- FlowSpec spec = (FlowSpec)(specs.iterator().next());
+ FlowSpec spec = (FlowSpec) (specs.iterator().next());
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -234,13 +235,13 @@ public class GitConfigMonitorTest {
}
});
- FlowSpec spec = (FlowSpec)specList.get(0);
+ FlowSpec spec = (FlowSpec) specList.get(0);
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
Assert.assertEquals(spec.getConfig().getString("param1"), "value1");
- spec = (FlowSpec)specList.get(1);
+ spec = (FlowSpec) specList.get(1);
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -279,13 +280,13 @@ public class GitConfigMonitorTest {
}
});
- spec = (FlowSpec)specList.get(0);
+ spec = (FlowSpec) specList.get(0);
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
Assert.assertEquals(spec.getConfig().getString("param1"), "value4");
- spec = (FlowSpec)specList.get(1);
+ spec = (FlowSpec) specList.get(1);
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow3"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow3");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
@@ -325,7 +326,7 @@ public class GitConfigMonitorTest {
specs = this.flowCatalog.getSpecs();
Assert.assertTrue(specs.size() == 1);
- FlowSpec spec = (FlowSpec)(specs.iterator().next());
+ FlowSpec spec = (FlowSpec) (specs.iterator().next());
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 926dd10..68b030b 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -95,7 +95,7 @@ public class GobblinServiceManagerTest {
Properties serviceCoreProperties = new Properties();
serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR);
serviceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
- serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY , TEST_GOBBLIN_EXECUTOR_NAME);
+ serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".description",
"StandaloneTestExecutor");
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".version",
@@ -108,9 +108,9 @@ public class GobblinServiceManagerTest {
TEST_SOURCE_NAME + ":" + TEST_SINK_NAME);
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, true);
- serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR);
- serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR);
- serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5);
+ serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR);
+ serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR);
+ serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5);
// Create a bare repository
RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
@@ -200,7 +200,7 @@ public class GobblinServiceManagerTest {
Assert.assertEquals(flowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
- Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
+ Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
// Add this assert back when getFlowSpec() is changed to return the raw flow spec
@@ -228,7 +228,7 @@ public class GobblinServiceManagerTest {
Assert.assertEquals(retrievedFlowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
Assert.assertEquals(retrievedFlowConfig.getId().getFlowName(), TEST_FLOW_NAME);
- Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
+ Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
Assert.assertEquals(retrievedFlowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
// Add this asssert when getFlowSpec() is changed to return the raw flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 2);
@@ -279,7 +279,7 @@ public class GobblinServiceManagerTest {
specs = this.gobblinServiceManager.flowCatalog.getSpecs();
Assert.assertTrue(specs.size() == 1);
- FlowSpec spec = (FlowSpec)(specs.iterator().next());
+ FlowSpec spec = (FlowSpec) (specs.iterator().next());
Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
index 9738344..04f2270 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java
@@ -51,6 +51,10 @@ public class BaseFlowGraphTest {
private FlowEdge edge3;
private FlowEdge edge3c;
+ private String edgeId1;
+ private String edgeId2;
+ private String edgeId3;
+
BaseFlowGraph graph;
@BeforeClass
public void setUp()
@@ -58,33 +62,40 @@ public class BaseFlowGraphTest {
IOException, DataNode.DataNodeCreationException {
Properties properties = new Properties();
properties.put("key1", "val1");
- Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node1"));
+ Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+ ConfigValueFactory.fromAnyRef("node1"));
node1 = new BaseDataNode(node1Config);
properties = new Properties();
properties.put("key2", "val2");
- Config node2Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node2"));
+ Config node2Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+ ConfigValueFactory.fromAnyRef("node2"));
node2 = new BaseDataNode(node2Config);
properties = new Properties();
properties.put("key3", "val3");
- Config node3Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node3"));
+ Config node3Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+ ConfigValueFactory.fromAnyRef("node3"));
node3 = new BaseDataNode(node3Config);
//Create a clone of node3
node3c = new BaseDataNode(node3Config);
- FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"),"","", ConfigFactory.empty(),null, null, null);
- FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"),"","", ConfigFactory.empty(),null, null, null);
- FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"),"","", ConfigFactory.empty(),null, null, null);
+ FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null);
+ FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null);
+ FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null);
//Create edge instances
- edge1 = new BaseFlowEdge(Lists.newArrayList("node1", "node2"), "edge1", flowTemplate1, null, ConfigFactory.empty(), true);
- edge2 = new BaseFlowEdge(Lists.newArrayList("node2", "node3"), "edge2", flowTemplate2, null, ConfigFactory.empty(), true);
- edge3 = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), "edge3", flowTemplate3, null, ConfigFactory.empty(), true);
+ edgeId1 = "node1:node2:edge1";
+ edgeId2 = "node2:node3:edge2";
+ edgeId3 = "node3:node1:edge3";
+
+ edge1 = new BaseFlowEdge(Lists.newArrayList("node1", "node2"), edgeId1, flowTemplate1, null, ConfigFactory.empty(), true);
+ edge2 = new BaseFlowEdge(Lists.newArrayList("node2", "node3"), edgeId2, flowTemplate2, null, ConfigFactory.empty(), true);
+ edge3 = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), edgeId3, flowTemplate3, null, ConfigFactory.empty(), true);
//Create a clone of edge3
- edge3c = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), "edge3", flowTemplate3, null, ConfigFactory.empty(), true);
+ edge3c = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), edgeId3, flowTemplate3, null, ConfigFactory.empty(), true);
//Create a FlowGraph
graph = new BaseFlowGraph();
@@ -193,8 +204,7 @@ public class BaseFlowGraphTest {
@Test (dependsOnMethods = "testDeleteDataNode")
public void testDeleteFlowEdgeById() throws Exception {
- String edgeLabel1 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node1", "node2"), "edge1");
- Assert.assertTrue(graph.deleteFlowEdge(edgeLabel1));
+ Assert.assertTrue(graph.deleteFlowEdge(edgeId1));
Assert.assertEquals(graph.getEdges("node1").size(), 0);
Assert.assertEquals(graph.getEdges("node2").size(), 1);
Assert.assertEquals(graph.getEdges("node3").size(), 1);
@@ -203,8 +213,7 @@ public class BaseFlowGraphTest {
Assert.assertTrue(graph.getEdges("node2").contains(edge2));
Assert.assertTrue(graph.getEdges("node3").contains(edge3));
- String edgeLabel2 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node2", "node3"), "edge2");
- Assert.assertTrue(graph.deleteFlowEdge(edgeLabel2));
+ Assert.assertTrue(graph.deleteFlowEdge(edgeId2));
Assert.assertEquals(graph.getEdges("node1").size(), 0);
Assert.assertEquals(graph.getEdges("node2").size(), 0);
Assert.assertEquals(graph.getEdges("node3").size(), 1);
@@ -213,8 +222,7 @@ public class BaseFlowGraphTest {
Assert.assertTrue(!graph.getEdges("node2").contains(edge2));
Assert.assertTrue(graph.getEdges("node3").contains(edge3));
- String edgeLabel3 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node3", "node1"), "edge3");
- Assert.assertTrue(graph.deleteFlowEdge(edgeLabel3));
+ Assert.assertTrue(graph.deleteFlowEdge(edgeId3));
Assert.assertEquals(graph.getEdges("node1").size(), 0);
Assert.assertEquals(graph.getEdges("node2").size(), 0);
Assert.assertEquals(graph.getEdges("node3").size(), 0);
@@ -223,8 +231,8 @@ public class BaseFlowGraphTest {
Assert.assertTrue(!graph.getEdges("node2").contains(edge2));
Assert.assertTrue(!graph.getEdges("node3").contains(edge3));
- Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel1));
- Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel2));
- Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel3));
+ Assert.assertTrue(!graph.deleteFlowEdge(edgeId1));
+ Assert.assertTrue(!graph.deleteFlowEdge(edgeId2));
+ Assert.assertTrue(!graph.deleteFlowEdge(edgeId3));
}
}
\ No newline at end of file