You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/07 22:05:09 UTC
incubator-gobblin git commit: [GOBBLIN-304] Change default version of
flow specs to null.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 0e5561519 -> c385f1ddd
[GOBBLIN-304] Change default version of flow specs to null.
Closes #2159 from arjun4084346/removeSpecVersion
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c385f1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c385f1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c385f1dd
Branch: refs/heads/master
Commit: c385f1dddb0124f0ff2545d9a591f3b055f76080
Parents: 0e55615
Author: Arjun <ab...@linkedin.com>
Authored: Tue Nov 7 14:04:53 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Nov 7 14:04:53 2017 -0800
----------------------------------------------------------------------
.../apache/gobblin/service/FlowConfigTest.java | 12 +--
.../gobblin/service/FlowConfigsResource.java | 40 +--------
.../apache/gobblin/runtime/api/FlowSpec.java | 7 +-
.../runtime/spec_catalog/FlowCatalog.java | 17 ++--
.../runtime/spec_catalog/TopologyCatalog.java | 15 +---
.../gobblin/runtime/spec_store/FSSpecStore.java | 92 +++++---------------
.../gobblin/spec_catalog/FlowCatalogTest.java | 7 +-
.../spec_catalog/TopologyCatalogTest.java | 3 +-
.../service/modules/core/GitConfigMonitor.java | 7 +-
.../modules/core/GobblinServiceHATest.java | 12 +--
.../modules/core/GobblinServiceManagerTest.java | 25 ++----
.../core/IdentityFlowToJobSpecCompilerTest.java | 4 +-
.../MultiHopsFlowToJobSpecCompilerTest.java | 4 +-
.../modules/orchestration/OrchestratorTest.java | 2 +-
14 files changed, 70 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 9ac7d37..a373762 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -171,11 +171,8 @@ public class FlowConfigTest {
try {
_client.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.S_409_CONFLICT.getCode());
- return;
+ Assert.fail("Create Again should pass without complaining that the spec already exists.");
}
-
- Assert.fail("Get should have gotten a 409 error");
}
@Test (dependsOnMethods = "testCreateAgain")
@@ -187,7 +184,7 @@ public class FlowConfigTest {
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
- Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
+ Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
// Add this asssert back when getFlowSpec() is changed to return the raw flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
@@ -282,11 +279,8 @@ public class FlowConfigTest {
try {
_client.updateFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND.getCode());
- return;
+ Assert.fail("Bad update should pass without complaining that the spec does not exists.");
}
-
- Assert.fail("Get should have raised a 404 error");
}
@AfterClass(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index a99087c..f0bce17 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -123,7 +123,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
flowConfig.setSchedule(schedule);
}
-
// remove keys that were injected as part of flowSpec creation
flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
@@ -180,26 +179,13 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
public CreateResponse create(FlowConfig flowConfig) {
LOG.info("Create called with flowName " + flowConfig.getId().getFlowName());
- LOG.info("ReadyToUse is: " + readyToUse);
- LOG.info("FlowCatalog is: " + getFlowCatalog());
+ LOG.debug("ReadyToUse is: " + readyToUse);
+ LOG.debug("FlowCatalog is: " + getFlowCatalog());
if (!readyToUse && getFlowCatalog() == null) {
throw new RuntimeException("Not ready for use.");
}
- try {
- URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
- URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
- "/" + flowConfig.getId().getFlowGroup() + "/" + flowConfig.getId().getFlowName(), null, null);
-
- if (getFlowCatalog().exists(flowUri)) {
- logAndThrowRestLiServiceException(HttpStatus.S_409_CONFLICT,
- "Flow with the same name already exists: " + flowUri, null);
- }
- } catch (URISyntaxException e) {
- logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getId().getFlowName(), e);
- }
-
getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED);
@@ -216,7 +202,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, FlowConfig flowConfig) {
String flowGroup = key.getKey().getFlowGroup();
String flowName = key.getKey().getFlowName();
- URI flowUri = null;
LOG.info("Update called with flowGroup " + flowGroup + " flowName " + flowName);
@@ -225,25 +210,9 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
"flowName and flowGroup cannot be changed in update", null);
}
- try {
- URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null);
- flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(),
- "/" + flowGroup + "/" + flowName, null, null);
- if (!getFlowCatalog().exists(flowUri)) {
- logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND,
- "Flow does not exist: flowGroup " + flowGroup + " flowName " + flowName, null);
- }
-
- FlowSpec newFlowSpec = createFlowSpecForConfig(flowConfig);
-
- getFlowCatalog().put(newFlowSpec);
+ getFlowCatalog().put(createFlowSpecForConfig(flowConfig));
return new UpdateResponse(HttpStatus.S_200_OK);
- } catch (URISyntaxException e) {
- logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e);
- }
-
- return null;
}
/**
@@ -270,9 +239,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt
return new UpdateResponse(HttpStatus.S_200_OK);
} catch (URISyntaxException e) {
logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e);
- } catch (SpecNotFoundException e) {
- logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow does not exist: flowGroup " + flowGroup +
- " flowName " + flowName, null);
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index e7c08d2..59a5025 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -126,16 +126,17 @@ public class FlowSpec implements Configurable, Spec {
* <li> Default flowCatalogURI is {@link #DEFAULT_FLOW_CATALOG_SCHEME}:
* <li> Convention for FlowSpec URI: <flowCatalogURI>/config.get({@link ConfigurationKeys#FLOW_GROUP_KEY})/config.get({@link ConfigurationKeys#FLOW_NAME_KEY})
* <li> Convention for Description: config.get({@link ConfigurationKeys#FLOW_DESCRIPTION_KEY})
- * <li> Default version: 1
+ * <li> Default version: empty
* </ul>
*/
public static class Builder {
public static final String DEFAULT_FLOW_CATALOG_SCHEME = "gobblin-flow";
+ public static final String DEFAULT_VERSION = "";
@VisibleForTesting
private Optional<Config> config = Optional.absent();
private Optional<Properties> configAsProperties = Optional.absent();
private Optional<URI> uri;
- private String version = "1";
+ private String version = FlowSpec.Builder.DEFAULT_VERSION;
private Optional<String> description = Optional.absent();
private Optional<URI> flowCatalogURI = Optional.absent();
private Optional<Set<URI>> templateURIs = Optional.absent();
@@ -163,7 +164,7 @@ public class FlowSpec implements Configurable, Spec {
public FlowSpec build() {
Preconditions.checkNotNull(this.uri);
- Preconditions.checkNotNull(this.version);
+ Preconditions.checkArgument(null != version, "Version should not be null");
return new FlowSpec(getURI(), getVersion(), getDescription(), getConfig(),
getConfigAsProperties(), getTemplateURIs(), getChildSpecs());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 1cb09da..ecfe036 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -234,29 +234,22 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
((FlowSpec) spec).getConfigAsProperties()));
- if (specStore.exists(spec.getUri())) {
- specStore.updateSpec(spec);
- this.listeners.onUpdateSpec(spec);
- } else {
- specStore.addSpec(spec);
- this.listeners.onAddSpec(spec);
- }
-
- } catch (IOException | SpecNotFoundException e) {
+ specStore.addSpec(spec);
+ this.listeners.onAddSpec(spec);
+ } catch (IOException e) {
throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
}
}
@Override
- public void remove(URI uri) throws SpecNotFoundException {
+ public void remove(URI uri) {
try {
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
Preconditions.checkNotNull(uri);
log.info(String.format("Removing FlowSpec with URI: %s", uri));
- Spec spec = specStore.getSpec(uri);
- this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion());
specStore.deleteSpec(uri);
+ this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
} catch (IOException e) {
throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index c6e02d2..7bb8b9c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -30,6 +30,7 @@ import lombok.Getter;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -232,15 +233,9 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
log.info(String.format("Adding TopologySpec with URI: %s and Config: %s", spec.getUri(),
((TopologySpec) spec).getConfigAsProperties()));
- if (specStore.exists(spec.getUri())) {
- specStore.updateSpec(spec);
- this.listeners.onUpdateSpec(spec);
- } else {
specStore.addSpec(spec);
this.listeners.onAddSpec(spec);
- }
-
- } catch (IOException | SpecNotFoundException e) {
+ } catch (IOException e) {
throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
}
}
@@ -252,11 +247,9 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
Preconditions.checkNotNull(uri);
log.info(String.format("Removing TopologySpec with URI: %s", uri));
- Spec spec = specStore.getSpec(uri);
- this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion());
+ this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
specStore.deleteSpec(uri);
-
- } catch (IOException | SpecNotFoundException e) {
+ } catch (IOException e) {
throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index b283c87..d87d6d4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -25,6 +25,7 @@ import java.net.URI;
import java.util.Collection;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -114,62 +115,36 @@ public class FSSpecStore implements SpecStore {
return Files.getNameWithoutExtension(specUri.getName());
}
- private Collection<Spec> getAllVersionsOfSpec(String specGroup, String specName) throws IOException {
+ private Collection<Spec> getAllVersionsOfSpec(Path spec) {
Collection<Spec> specs = Lists.newArrayList();
- FileStatus[] fileStatuses;
- try {
- fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
- } catch (FileNotFoundException e) {
- return specs;
- }
- for (FileStatus fileStatus : fileStatuses) {
- if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
- specs.add(readSpecFromFile(fileStatus.getPath()));
- }
+ try {
+ specs.add(readSpecFromFile(spec));
+ } catch (IOException e) {
+ log.warn("Spec {} not found.", spec);
}
return specs;
}
+ /**
+ * Returns all versions of the spec defined by specUri.
+ * Currently, multiple versions are not supported, so this should return exactly one spec.
+ * @param specUri URI for the {@link Spec} to be retrieved.
+ * @return all versions of the spec.
+ */
@Override
- public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException {
+ public Collection<Spec> getAllVersionsOfSpec(URI specUri) {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
- Path specPath = new Path(specUri.getPath());
- return getAllVersionsOfSpec(getSpecGroup(specPath), getSpecName(specPath));
+ Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION);
+ return getAllVersionsOfSpec(specPath);
}
@Override
public boolean exists(URI specUri) throws IOException {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
- Path flowPath = new Path(specUri.getPath());
- String specGroup = getSpecGroup(flowPath);
- String specName = getSpecName(flowPath);
- FileStatus[] fileStatuses;
- try {
- fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup);
- } catch (FileNotFoundException e) {
- return false;
- }
- // TODO Fix ETL-6496
- // We need to revisit having a version delimiter.
- // Currently without a delimiter the prefix check may match other specs that should not be matched.
- for (FileStatus fileStatus : fileStatuses) {
- if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) {
- return true;
- }
- }
- return false;
- }
-
- private FileStatus[] listSpecs(Path fsSpecStoreDirPath, String specGroup) throws FileNotFoundException, IOException {
- FileStatus[] fileStatuses;
- if (StringUtils.isEmpty(specGroup)) {
- fileStatuses = fs.listStatus(fsSpecStoreDirPath);
- } else {
- fileStatuses = fs.listStatus(new Path(fsSpecStoreDirPath, specGroup));
- }
- return fileStatuses;
+ Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION);
+ return fs.exists(specPath);
}
@Override
@@ -192,11 +167,7 @@ public class FSSpecStore implements SpecStore {
public boolean deleteSpec(URI specUri) throws IOException {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
- try {
- return deleteSpec(specUri, getSpec(specUri).getVersion());
- } catch (SpecNotFoundException e) {
- throw new IOException(String.format("Issue in removing Spec: %s", specUri), e);
- }
+ return deleteSpec(specUri, FlowSpec.Builder.DEFAULT_VERSION);
}
@Override
@@ -207,13 +178,7 @@ public class FSSpecStore implements SpecStore {
try {
log.info(String.format("Deleting Spec with URI: %s in FSSpecStore: %s", specUri, this.fsSpecStoreDirPath));
Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, version);
-
- if (fs.exists(specPath)) {
- return fs.delete(specPath, false);
- } else {
- log.warn("No file with URI:" + specUri + " is found. Deletion failed.");
- return false;
- }
+ return fs.delete(specPath, false);
} catch (IOException e) {
throw new IOException(String.format("Issue in removing Spec: %s for Version: %s", specUri, version), e);
}
@@ -221,17 +186,12 @@ public class FSSpecStore implements SpecStore {
@Override
public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
- Preconditions.checkArgument(null != spec, "Spec should not be null");
-
- log.info(String.format("Updating Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath));
- Path specPath = getPathForURI(this.fsSpecStoreDirPath, spec.getUri(), spec.getVersion());
- writeSpecToFile(specPath, spec);
-
+ addSpec(spec);
return spec;
}
@Override
- public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+ public Spec getSpec(URI specUri) throws SpecNotFoundException {
Preconditions.checkArgument(null != specUri, "Spec URI should not be null");
Collection<Spec> specs = getAllVersionsOfSpec(specUri);
@@ -296,9 +256,9 @@ public class FSSpecStore implements SpecStore {
* @throws IOException
*/
protected Spec readSpecFromFile(Path path) throws IOException {
- Spec spec = null;
+ Spec spec;
- try (FSDataInputStream fis = fs.open(path);) {
+ try (FSDataInputStream fis = fs.open(path)) {
spec = this.specSerDe.deserialize(ByteStreams.toByteArray(fis));
}
@@ -312,12 +272,8 @@ public class FSSpecStore implements SpecStore {
* @throws IOException
*/
protected void writeSpecToFile(Path specPath, Spec spec) throws IOException {
- if (fs.exists(specPath)) {
- fs.delete(specPath, true);
- }
-
byte[] serializedSpec = this.specSerDe.serialize(spec);
- try (FSDataOutputStream os = fs.create(specPath)) {
+ try (FSDataOutputStream os = fs.create(specPath, true)) {
os.write(serializedSpec);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
index ae2e087..537cdbe 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
@@ -56,7 +56,7 @@ public class FlowCatalogTest {
private static final String SPEC_STORE_DIR = "/tmp/flowTestSpecStore";
private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir";
private static final String SPEC_DESCRIPTION = "Test Flow Spec";
- private static final String SPEC_VERSION = "1";
+ private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
private ServiceBasedAppLauncher serviceLauncher;
private FlowCatalog flowCatalog;
@@ -144,6 +144,11 @@ public class FlowCatalogTest {
}
@Test (dependsOnMethods = "createFlowSpec")
+ void testExist() throws Exception {
+ Assert.assertTrue(flowCatalog.exists(flowSpec.getUri()));
+ }
+
+ @Test (dependsOnMethods = "testExist")
public void deleteFlowSpec() throws SpecNotFoundException {
// List Current Specs
Collection<Spec> specs = flowCatalog.getSpecs();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
index 48fba40..a3490ff 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class TopologyCatalogTest {
private static final String SPEC_STORE_PARENT_DIR = "/tmp";
private static final String SPEC_STORE_DIR = "/tmp/topologyTestSpecStore";
private static final String SPEC_DESCRIPTION = "Test Topology Spec";
- private static final String SPEC_VERSION = "1";
+ private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
private ServiceBasedAppLauncher serviceLauncher;
private TopologyCatalog topologyCatalog;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 00f8fc2..b20e3b7 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -72,7 +72,7 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GitConfigMonitor extends AbstractIdleService {
private static final String SPEC_DESCRIPTION = "Git-based flow config";
- private static final String SPEC_VERSION = "1";
+ private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
private static final int TERMINATION_TIMEOUT = 30;
private static final int CONFIG_FILE_DEPTH = 3;
private static final String REMOTE_NAME = "origin";
@@ -251,12 +251,7 @@ public class GitConfigMonitor extends AbstractIdleService {
.withDescription(SPEC_DESCRIPTION)
.build();
- try {
this.flowCatalog.remove(spec.getUri());
- } catch (SpecNotFoundException e) {
- // okay if flow does not exist
- log.warn("Flow {} does not exist.", spec.getUri());
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 289e212..ad4180a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -301,16 +301,14 @@ public class GobblinServiceHATest {
// Try create on both nodes
try {
this.node1FlowConfigClient.createFlowConfig(flowConfig1);
- Assert.fail("Get should have gotten a 409 error");
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409);
+ Assert.fail("Create Again should pass without complaining that the spec already exists.");
}
try {
this.node2FlowConfigClient.createFlowConfig(flowConfig2);
- Assert.fail("Get should have gotten a 409 error");
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409);
+ Assert.fail("Create Again should pass without complaining that the spec already exists.");
}
}
@@ -442,16 +440,14 @@ public class GobblinServiceHATest {
try {
this.node1FlowConfigClient.updateFlowConfig(flowConfig);
- Assert.fail("Get should have raised a 404 error");
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
+ Assert.fail("Bad update should pass without complaining that the spec does not exists.");
}
try {
this.node2FlowConfigClient.updateFlowConfig(flowConfig);
- Assert.fail("Get should have raised a 404 error");
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
+ Assert.fail("Bad update should pass without complaining that the spec does not exists.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index b40792e..926dd10 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -66,15 +66,14 @@ public class GobblinServiceManagerTest {
private static final String SERVICE_WORK_DIR = "/tmp/serviceWorkDir/";
private static final String SPEC_STORE_PARENT_DIR = "/tmp/serviceCore/";
private static final String SPEC_DESCRIPTION = "Test ServiceCore";
- private static final String SPEC_VERSION = "1";
private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/serviceCore/topologyTestSpecStore";
private static final String FLOW_SPEC_STORE_DIR = "/tmp/serviceCore/flowTestSpecStore";
private static final String GIT_CLONE_DIR = "/tmp/serviceCore/clone";
private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote";
private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local";
- private static final String TEST_GROUP_NAME = "testGroup1";
- private static final String TEST_FLOW_NAME = "testFlow1";
+ private static final String TEST_GROUP_NAME = "testGroup";
+ private static final String TEST_FLOW_NAME = "testFlow";
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
private static final String TEST_DUMMY_GROUP_NAME = "dummyGroup";
@@ -100,7 +99,7 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".description",
"StandaloneTestExecutor");
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".version",
- "1");
+ FlowSpec.Builder.DEFAULT_VERSION);
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".uri",
"gobblinExecutor");
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance",
@@ -168,8 +167,7 @@ public class GobblinServiceManagerTest {
flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
- .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).
- setRunImmediately(true))
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
.setProperties(new StringMap(flowProperties));
this.flowConfigClient.createFlowConfig(flowConfig);
@@ -191,11 +189,8 @@ public class GobblinServiceManagerTest {
try {
this.flowConfigClient.createFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409);
- return;
+ Assert.fail("Create Again should pass without complaining that the spec already exists.");
}
-
- Assert.fail("Get should have gotten a 409 error");
}
@Test (dependsOnMethods = "testCreateAgain")
@@ -207,8 +202,8 @@ public class GobblinServiceManagerTest {
Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE );
Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
- Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
- // Add this asssert back when getFlowSpec() is changed to return the raw flow spec
+ Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+ // Add this assert back when getFlowSpec() is changed to return the raw flow spec
//Assert.assertEquals(flowConfig.getProperties().size(), 1);
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
}
@@ -333,10 +328,8 @@ public class GobblinServiceManagerTest {
try {
this.flowConfigClient.updateFlowConfig(flowConfig);
} catch (RestLiResponseException e) {
- Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
- return;
+ Assert.fail("Bad update should pass without complaining that the spec does not exists.");
}
-
- Assert.fail("Get should have raised a 404 error");
+ cleanUpDir(FLOW_SPEC_STORE_DIR);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
index 864b238..2dbe790 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java
@@ -65,7 +65,7 @@ public class IdentityFlowToJobSpecCompilerTest {
private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
private static final String SPEC_DESCRIPTION = "Test Orchestrator";
- private static final String SPEC_VERSION = "1";
+ private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();
@@ -145,7 +145,7 @@ public class IdentityFlowToJobSpecCompilerTest {
FLOW_SPEC_STORE_DIR))
.withConfig(config)
.withDescription("dummy description")
- .withVersion("1")
+ .withVersion(SPEC_VERSION)
.withTemplate(new URI(TEST_TEMPLATE_URI));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
index cc722eb..8b08628 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
@@ -76,7 +76,7 @@ public class MultiHopsFlowToJobSpecCompilerTest {
private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
private static final String SPEC_DESCRIPTION = "Test Orchestrator";
- private static final String SPEC_VERSION = "1";
+ private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
private static final String TOPOLOGY_SPEC_STORE_DIR_SECOND = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis() + "_2";
private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();
@@ -288,7 +288,7 @@ public class MultiHopsFlowToJobSpecCompilerTest {
FLOW_SPEC_STORE_DIR))
.withConfig(config)
.withDescription("dummy description")
- .withVersion("1")
+ .withVersion(SPEC_VERSION)
.withTemplate(new URI(TEST_TEMPLATE_URI));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 8896068..6d75f9e 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -57,7 +57,7 @@ public class OrchestratorTest {
private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
private static final String SPEC_DESCRIPTION = "Test Orchestrator";
- private static final String SPEC_VERSION = "1";
+ private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore";
private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore";
private static final String FLOW_SPEC_GROUP_DIR = "/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir";