You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/25 16:29:16 UTC
incubator-gobblin git commit: [GOBBLIN-291] Remove unnecessary spec
list and read
Repository: incubator-gobblin
Updated Branches:
refs/heads/master f3eadceed -> 2d05b03d5
[GOBBLIN-291] Remove unnecessary spec list and read
resolve review comments
Closes #2147 from arjun4084346/flowDelay
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2d05b03d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2d05b03d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2d05b03d
Branch: refs/heads/master
Commit: 2d05b03d5e76f9e4056b2adc321ff4b3ef778dc5
Parents: f3eadce
Author: Arjun <ab...@linkedin.com>
Authored: Wed Oct 25 09:29:11 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Oct 25 09:29:11 2017 -0700
----------------------------------------------------------------------
.../gobblin/service/FlowConfigsResource.java | 14 ++--
.../gobblin/runtime/api/MutableSpecCatalog.java | 5 +-
.../runtime/spec_catalog/FlowCatalog.java | 12 ++-
.../gobblin/runtime/spec_store/FSSpecStore.java | 85 +++++++++++++++-----
.../gobblin/spec_catalog/FlowCatalogTest.java | 10 ++-
.../service/modules/core/GitConfigMonitor.java | 17 ++--
.../modules/orchestration/OrchestratorTest.java | 3 +-
7 files changed, 102 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index 3159d49..a99087c 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -192,14 +192,12 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
"/" + flowConfig.getId().getFlowGroup() + "/" + flowConfig.getId().getFlowName(), null, null);
- if (getFlowCatalog().getSpec(flowUri) != null) {
+ if (getFlowCatalog().exists(flowUri)) {
logAndThrowRestLiServiceException(HttpStatus.S_409_CONFLICT,
"Flow with the same name already exists: " + flowUri, null);
}
} catch (URISyntaxException e) {
logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getId().getFlowName(), e);
- } catch (SpecNotFoundException e) {
- // okay if flow does not exist
}
getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
@@ -231,7 +229,11 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
"/" + flowGroup + "/" + flowName, null, null);
- FlowSpec oldFlowSpec = (FlowSpec) getFlowCatalog().getSpec(flowUri);
+ if (!getFlowCatalog().exists(flowUri)) {
+ logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND,
+ "Flow does not exist: flowGroup " + flowGroup + " flowName " + flowName, null);
+ }
+
FlowSpec newFlowSpec = createFlowSpecForConfig(flowConfig);
getFlowCatalog().put(newFlowSpec);
@@ -239,9 +241,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
return new UpdateResponse(HttpStatus.S_200_OK);
} catch (URISyntaxException e) {
logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e);
- } catch (SpecNotFoundException e) {
- logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow does not exist: flowGroup " + flowGroup +
- " flowName " + flowName, null);
}
return null;
@@ -265,7 +264,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
"/" + flowGroup + "/" + flowName, null, null);
- FlowSpec flowSpec = (FlowSpec) getFlowCatalog().getSpec(flowUri);
getFlowCatalog().remove(flowUri);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 1751a56..f63600a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -34,7 +34,8 @@ public interface MutableSpecCatalog extends SpecCatalog {
public void put(Spec spec);
/**
- * Removes an existing {@link Spec} with the given URI. A no-op if such {@link Spec} does not exist.
+ * Removes an existing {@link Spec} with the given URI.
+ * Throws SpecNotFoundException if such {@link Spec} does not exist.
*/
- void remove(URI uri);
+ void remove(URI uri) throws SpecNotFoundException;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 8ffa4d7..1cb09da 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -209,6 +209,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
}
+ public boolean exists(URI uri) {
+ try {
+ return specStore.exists(uri);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e);
+ }
+ }
+
@Override
public Spec getSpec(URI uri) throws SpecNotFoundException {
try {
@@ -240,7 +248,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
@Override
- public void remove(URI uri) {
+ public void remove(URI uri) throws SpecNotFoundException {
try {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(uri);
@@ -250,7 +258,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion());
specStore.deleteSpec(uri);
- } catch (IOException | SpecNotFoundException e) {
+ } catch (IOException e) {
throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 608d390..b283c87 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.runtime.spec_store;
import com.google.common.io.ByteStreams;
+
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
@@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.io.Files;
import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -98,20 +101,77 @@ public class FSSpecStore implements SpecStore {
}
}
+ /**
+ * @param specUri path of the spec
+ * @return empty string for topology spec, as topolgies do not have a group,
+ * group name for flow spec
+ */
+ public static String getSpecGroup(Path specUri) {
+ return specUri.getParent().getName();
+ }
+
+ public static String getSpecName(Path specUri) {
+ return Files.getNameWithoutExtension(specUri.getName());
+ }
+
+ private Collection<Spec> getAllVersionsOfSpec(String specGroup, String specName) throws IOException {
+ Collection<Spec> specs = Lists.newArrayList();
+ FileStatus[] fileStatuses;
+ try {
+ fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
+ } catch (FileNotFoundException e) {
+ return specs;
+ }
+
+ for (FileStatus fileStatus : fileStatuses) {
+ if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
+ specs.add(readSpecFromFile(fileStatus.getPath()));
+ }
+ }
+ return specs;
+ }
+
+ @Override
+ public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException {
+ Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
+ Path specPath = new Path(specUri.getPath());
+ return getAllVersionsOfSpec(getSpecGroup(specPath), getSpecName(specPath));
+ }
+
@Override
public boolean exists(URI specUri) throws IOException {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
+ Path flowPath = new Path(specUri.getPath());
+ String specGroup = getSpecGroup(flowPath);
+ String specName = getSpecName(flowPath);
+ FileStatus[] fileStatuses;
+ try {
+ fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
+ } catch (FileNotFoundException e) {
+ return false;
+ }
- FileStatus[] fileStatuses = fs.listStatus(this.fsSpecStoreDirPath);
+ // TODO Fix ETL-6496
+ // We need to revisit having a version delimiter.
+ // Currently without a delimiter the prefix check may match other specs that should not be matched.
for (FileStatus fileStatus : fileStatuses) {
- if (StringUtils.startsWith(fileStatus.getPath().getName(), specUri.toString())) {
+ if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
return true;
}
}
-
return false;
}
+ private FileStatus[] listSpecs(Path fsSpecStoreDirPath, String specGroup) throws FileNotFoundException, IOException {
+ FileStatus[] fileStatuses;
+ if (StringUtils.isEmpty(specGroup)) {
+ fileStatuses = fs.listStatus(fsSpecStoreDirPath);
+ } else {
+ fileStatuses = fs.listStatus(new Path(fsSpecStoreDirPath, specGroup));
+ }
+ return fileStatuses;
+ }
+
@Override
public void addSpec(Spec spec) throws IOException {
Preconditions.checkArgument(null != spec, "Spec should not be null");
@@ -207,25 +267,6 @@ public class FSSpecStore implements SpecStore {
}
@Override
- public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException, SpecNotFoundException {
- Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
-
- Collection<Spec> specs = getSpecs();
- Collection<Spec> filteredSpecs = Lists.newArrayList();
- for (Spec spec : specs) {
- if (spec.getUri().equals(specUri)) {
- filteredSpecs.add(spec);
- }
- }
-
- if (filteredSpecs.size() == 0) {
- throw new SpecNotFoundException(specUri);
- }
-
- return filteredSpecs;
- }
-
- @Override
public Collection<Spec> getSpecs() throws IOException {
Collection<Spec> specs = Lists.newArrayList();
try {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
index 73c1f46..ae2e087 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ public class FlowCatalogTest {
private static final String SPEC_STORE_PARENT_DIR = "/tmp";
private static final String SPEC_STORE_DIR = "/tmp/flowTestSpecStore";
+ private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir";
private static final String SPEC_DESCRIPTION = "Test Flow Spec";
private static final String SPEC_VERSION = "1";
@@ -142,7 +144,7 @@ public class FlowCatalogTest {
}
@Test (dependsOnMethods = "createFlowSpec")
- public void deleteFlowSpec() {
+ public void deleteFlowSpec() throws SpecNotFoundException {
// List Current Specs
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Delete] Number of specs: " + specs.size());
@@ -157,18 +159,18 @@ public class FlowCatalogTest {
// List Specs after adding
specs = flowCatalog.getSpecs();
- logger.info("[After Create] Number of specs: " + specs.size());
+ logger.info("[After Delete] Number of specs: " + specs.size());
i = 0;
for (Spec spec : specs) {
flowSpec = (FlowSpec) spec;
- logger.info("[After Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+ logger.info("[After Delete] Spec " + i++ + ": " + gson.toJson(flowSpec));
}
Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion");
}
public URI computeFlowSpecURI() {
// Make sure this is relative
- URI uri = PathUtils.relativizePath(new Path(SPEC_STORE_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
+ URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
return uri;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 82f3e0d..00f8fc2 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -234,8 +236,8 @@ public class GitConfigMonitor extends AbstractIdleService {
private void removeSpec(DiffEntry change) {
if (checkConfigFilePath(change.getOldPath())) {
Path configFilePath = new Path(this.repositoryDir, change.getOldPath());
- String flowName = Files.getNameWithoutExtension(configFilePath.getName());
- String flowGroup = configFilePath.getParent().getName();
+ String flowName = FSSpecStore.getSpecName(configFilePath);
+ String flowGroup = FSSpecStore.getSpecGroup(configFilePath);
// build a dummy config to get the proper URI for delete
Config dummyConfig = ConfigBuilder.create()
@@ -249,7 +251,12 @@ public class GitConfigMonitor extends AbstractIdleService {
.withDescription(SPEC_DESCRIPTION)
.build();
- this.flowCatalog.remove(spec.getUri());
+ try {
+ this.flowCatalog.remove(spec.getUri());
+ } catch (SpecNotFoundException e) {
+ // okay if flow does not exist
+ log.warn("Flow {} does not exist.", spec.getUri());
+ }
}
}
@@ -285,8 +292,8 @@ public class GitConfigMonitor extends AbstractIdleService {
*/
private Config loadConfigFileWithFlowNameOverrides(Path configFilePath) throws IOException {
Config flowConfig = this.pullFileLoader.loadPullFile(configFilePath, emptyConfig, false);
- String flowName = Files.getNameWithoutExtension(configFilePath.getName());
- String flowGroup = configFilePath.getParent().getName();
+ String flowName = FSSpecStore.getSpecName(configFilePath);
+ String flowGroup = FSSpecStore.getSpecGroup(configFilePath);
return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index a933e85..8896068 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -60,6 +60,7 @@ public class OrchestratorTest {
private static final String SPEC_VERSION = "1";
private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore";
private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore";
+ private static final String FLOW_SPEC_GROUP_DIR = "/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir";
private ServiceBasedAppLauncher serviceLauncher;
private TopologyCatalog topologyCatalog;
@@ -141,7 +142,7 @@ public class OrchestratorTest {
FlowSpec.Builder flowSpecBuilder = null;
try {
flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
- FLOW_SPEC_STORE_DIR))
+ FLOW_SPEC_GROUP_DIR))
.withConfig(config)
.withDescription(SPEC_DESCRIPTION)
.withVersion(SPEC_VERSION)