You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/05/10 20:27:42 UTC
falcon git commit: FALCON-1085 Support Cluster entity updates in
Falcon Server
Repository: falcon
Updated Branches:
refs/heads/master bb6032b2c -> f3ff8b27f
FALCON-1085 Support Cluster entity updates in Falcon Server
Added basic documentation, https://issues.apache.org/jira/browse/FALCON-1937 will contain detailed documentation.
Author: bvellanki <bv...@hortonworks.com>
Reviewers: "Venkat Ranganathan <ve...@hortonworks.com>, yzheng-hortonworks <yz...@hortonworks.com>, peeyush b <pb...@hortonworks.com>"
Closes #127 from bvellanki/FALCON-1085
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f3ff8b27
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f3ff8b27
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f3ff8b27
Branch: refs/heads/master
Commit: f3ff8b27f0a77d802306f0fc9ffdff51ae6c7486
Parents: bb6032b
Author: bvellanki <bv...@hortonworks.com>
Authored: Tue May 10 13:27:33 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Tue May 10 13:27:33 2016 -0700
----------------------------------------------------------------------
.../org/apache/falcon/cli/FalconEntityCLI.java | 9 +-
.../org/apache/falcon/FalconCLIConstants.java | 1 +
.../org/apache/falcon/client/FalconClient.java | 9 ++
client/src/main/resources/cluster-0.1.xsd | 1 +
client/src/main/resources/datasource-0.1.xsd | 3 +-
client/src/main/resources/feed-0.1.xsd | 2 +
client/src/main/resources/process-0.1.xsd | 2 +
.../org/apache/falcon/entity/ClusterHelper.java | 50 ++++++-
.../falcon/entity/ColoClusterRelation.java | 3 +-
.../org/apache/falcon/entity/EntityUtil.java | 69 +++++++++
.../falcon/entity/parser/FeedEntityParser.java | 8 ++
.../entity/parser/ProcessEntityParser.java | 8 ++
.../falcon/entity/store/ConfigurationStore.java | 18 ++-
.../EntityRelationshipGraphBuilder.java | 28 +++-
.../org/apache/falcon/update/UpdateHelper.java | 37 +++++
.../falcon/entity/ColoClusterRelationTest.java | 20 +++
.../apache/falcon/entity/EntityUtilTest.java | 21 +++
.../entity/parser/ClusterEntityParserTest.java | 3 +
.../parser/DatasourceEntityParserTest.java | 1 +
.../entity/parser/FeedEntityParserTest.java | 10 +-
.../entity/parser/ProcessEntityParserTest.java | 12 ++
.../entity/store/ConfigurationStoreTest.java | 31 +++++
.../metadata/MetadataMappingServiceTest.java | 23 +++
.../apache/falcon/update/UpdateHelperTest.java | 64 +++++++++
.../resources/config/process/process-0.1.xml | 2 +-
.../src/site/twiki/falconcli/UpdateEntity.twiki | 7 +-
docs/src/site/twiki/restapi/EntityUpdate.twiki | 4 +-
.../falcon/resource/AbstractEntityManager.java | 95 ++++++++++++-
.../proxy/SchedulableEntityManagerProxy.java | 86 +++++++++---
.../org/apache/falcon/unit/TestFalconUnit.java | 6 +-
.../falcon/resource/ConfigSyncService.java | 16 +++
.../resource/SchedulableEntityManager.java | 33 ++++-
.../falcon/cli/FalconClusterUpdateCLIIT.java | 139 +++++++++++++++++++
.../apache/falcon/cli/FalconSafemodeCLIIT.java | 3 -
.../org/apache/falcon/resource/TestContext.java | 1 +
.../test/resources/cluster-updated-template.xml | 42 ++++++
36 files changed, 817 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
index 37a6992..78b2225 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconEntityCLI.java
@@ -66,6 +66,8 @@ public class FalconEntityCLI extends FalconCLI {
"Submits an entity xml to Falcon");
Option update = new Option(FalconCLIConstants.UPDATE_OPT, false,
"Updates an existing entity xml");
+ Option updateClusterDependents = new Option(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT, false,
+ "Updates dependent entities of a cluster in workflow engine");
Option schedule = new Option(FalconCLIConstants.SCHEDULE_OPT, false,
"Schedules a submited entity in Falcon");
Option suspend = new Option(FalconCLIConstants.SUSPEND_OPT, false,
@@ -96,6 +98,7 @@ public class FalconEntityCLI extends FalconCLI {
OptionGroup group = new OptionGroup();
group.addOption(submit);
group.addOption(update);
+ group.addOption(updateClusterDependents);
group.addOption(schedule);
group.addOption(suspend);
group.addOption(resume);
@@ -217,7 +220,8 @@ public class FalconEntityCLI extends FalconCLI {
}
EntityType entityTypeEnum = null;
- if (optionsList.contains(FalconCLIConstants.LIST_OPT)) {
+ if (optionsList.contains(FalconCLIConstants.LIST_OPT)
+ || optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) {
if (entityType == null) {
entityType = "";
}
@@ -255,6 +259,9 @@ public class FalconEntityCLI extends FalconCLI {
validateColo(optionsList);
validateNotEmpty(entityName, FalconCLIConstants.ENTITY_NAME_OPT);
result = client.update(entityType, entityName, filePath, skipDryRun, doAsUser).getMessage();
+ } else if (optionsList.contains(FalconCLIConstants.UPDATE_CLUSTER_DEPENDENTS_OPT)) {
+ validateNotEmpty(cluster, FalconCLIConstants.CLUSTER_OPT);
+ result = client.updateClusterDependents(cluster, skipDryRun, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateNotEmpty(filePath, "file");
validateColo(optionsList);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
index 1db5cfe..31ead63 100644
--- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
+++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java
@@ -47,6 +47,7 @@ public final class FalconCLIConstants {
public static final String VERSION_OPT = "version";
public static final String SUBMIT_OPT = "submit";
public static final String UPDATE_OPT = "update";
+ public static final String UPDATE_CLUSTER_DEPENDENTS_OPT = "updateClusterDependents";
public static final String DELETE_OPT = "delete";
public static final String SUBMIT_AND_SCHEDULE_OPT = "submitAndSchedule";
public static final String VALIDATE_OPT = "validate";
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 7a48973..1014d64 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -236,6 +236,7 @@ public class FalconClient extends AbstractFalconClient {
VALIDATE("api/entities/validate/", HttpMethod.POST, MediaType.TEXT_XML),
SUBMIT("api/entities/submit/", HttpMethod.POST, MediaType.TEXT_XML),
UPDATE("api/entities/update/", HttpMethod.POST, MediaType.TEXT_XML),
+ UPDATEDEPENDENTS("api/entities/updateClusterDependents/", HttpMethod.POST, MediaType.TEXT_XML),
SUBMITANDSCHEDULE("api/entities/submitAndSchedule/", HttpMethod.POST, MediaType.TEXT_XML),
SCHEDULE("api/entities/schedule/", HttpMethod.POST, MediaType.TEXT_XML),
SUSPEND("api/entities/suspend/", HttpMethod.POST, MediaType.TEXT_XML),
@@ -430,6 +431,14 @@ public class FalconClient extends AbstractFalconClient {
return getResponse(APIResult.class, clientResponse);
}
+ public APIResult updateClusterDependents(String clusterName, Boolean skipDryRun,
+ String doAsUser) throws FalconCLIException {
+ ClientResponse clientResponse = new ResourceBuilder().path(Entities.UPDATEDEPENDENTS.path, clusterName)
+ .addQueryParam(SKIP_DRYRUN, skipDryRun).addQueryParam(DO_AS_OPT, doAsUser)
+ .call(Entities.UPDATEDEPENDENTS);
+ return getResponse(APIResult.class, clientResponse);
+ }
+
@Override
public APIResult submitAndSchedule(String entityType, String filePath, Boolean skipDryRun,
String doAsUser, String properties) throws FalconCLIException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/cluster-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/cluster-0.1.xsd b/client/src/main/resources/cluster-0.1.xsd
index 0e0ada8..03e9f84 100644
--- a/client/src/main/resources/cluster-0.1.xsd
+++ b/client/src/main/resources/cluster-0.1.xsd
@@ -75,6 +75,7 @@
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:string" name="description"/>
<xs:attribute type="xs:string" name="colo" use="required"/>
+ <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:complexType name="locations">
<xs:annotation>
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/datasource-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/datasource-0.1.xsd b/client/src/main/resources/datasource-0.1.xsd
index 1202ba1..ef78239 100644
--- a/client/src/main/resources/datasource-0.1.xsd
+++ b/client/src/main/resources/datasource-0.1.xsd
@@ -76,6 +76,7 @@
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:string" name="colo" use="required"/>
<xs:attribute type="xs:string" name="description"/>
+ <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
<xs:attribute type="datasource-type" name="type" use="required">
<xs:annotation>
<xs:documentation>
@@ -263,7 +264,7 @@
<xs:complexType name="ACL">
<xs:annotation>
<xs:documentation>
- Access control list for this cluster.
+ Access control list for this Entity.
owner is the Owner of this entity.
group is the one which has access to read - not used at this time.
permission is not enforced at this time
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/feed-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/feed-0.1.xsd b/client/src/main/resources/feed-0.1.xsd
index 77e8663..3488233 100644
--- a/client/src/main/resources/feed-0.1.xsd
+++ b/client/src/main/resources/feed-0.1.xsd
@@ -129,6 +129,7 @@
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:string" name="description"/>
+ <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:complexType name="cluster">
<xs:annotation>
@@ -168,6 +169,7 @@
<xs:attribute type="cluster-type" name="type" use="optional"/>
<xs:attribute type="xs:string" name="partition" use="optional"/>
<xs:attribute type="frequency-type" name="delay" use="optional" />
+ <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:complexType name="partitions">
<xs:annotation>
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 456ebf9..0d01e33 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -188,6 +188,7 @@
<xs:element type="ACL" name="ACL" minOccurs="0"/>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
+ <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:simpleType name="IDENTIFIER">
@@ -219,6 +220,7 @@
<xs:element type="validity" name="validity"/>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
+ <xs:attribute type="xs:int" name="version" use="optional" default="0"/>
</xs:complexType>
<xs:complexType name="validity">
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 9d79742..aff4405 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -32,6 +32,8 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.SecurityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
@@ -45,7 +47,7 @@ public final class ClusterHelper {
public static final String WORKINGDIR = "working";
public static final String NO_USER_BROKER_URL = "NA";
public static final String EMPTY_DIR_NAME = "EMPTY_DIR_DONT_DELETE";
-
+ private static final Logger LOG = LoggerFactory.getLogger(ClusterHelper.class);
private ClusterHelper() {
}
@@ -123,6 +125,9 @@ public final class ClusterHelper {
}
public static Interface getInterface(Cluster cluster, Interfacetype type) {
+ if (cluster.getInterfaces() == null) {
+ return null;
+ }
for (Interface interf : cluster.getInterfaces().getInterfaces()) {
if (interf.getType() == type) {
return interf;
@@ -143,6 +148,9 @@ public final class ClusterHelper {
public static Location getLocation(Cluster cluster, ClusterLocationType clusterLocationType) {
+ if (cluster.getLocations() == null) {
+ return null;
+ }
for (Location loc : cluster.getLocations().getLocations()) {
if (loc.getName().equals(clusterLocationType)) {
return loc;
@@ -211,4 +219,44 @@ public final class ClusterHelper {
return getStorageUrl(cluster) + getLocation(cluster, ClusterLocationType.STAGING).getPath()
+ "/" + EMPTY_DIR_NAME;
}
+
+ public static boolean matchInterface(final Cluster oldEntity, final Cluster newEntity,
+ final Interfacetype interfaceType) {
+ Interface oldInterface = getInterface(oldEntity, interfaceType);
+ Interface newInterface = getInterface(newEntity, interfaceType);
+ String oldEndpoint = (oldInterface == null) ? null : oldInterface.getEndpoint();
+ String newEndpoint = (newInterface == null) ? null : newInterface.getEndpoint();
+ LOG.debug("Verifying if Interfaces match for cluster {} : Old - {}, New - {}",
+ interfaceType.name(), oldEndpoint, newEndpoint);
+ return StringUtils.isBlank(oldEndpoint) && StringUtils.isBlank(newEndpoint)
+ || StringUtils.isNotBlank(oldEndpoint) && oldEndpoint.equalsIgnoreCase(newEndpoint);
+ }
+
+ public static boolean matchLocations(final Cluster oldEntity, final Cluster newEntity,
+ final ClusterLocationType locationType) {
+ Location oldLocation = getLocation(oldEntity, locationType);
+ Location newLocation = getLocation(newEntity, locationType);
+ String oldLocationPath = (oldLocation == null) ? null : oldLocation.getPath();
+ String newLocationPath = (newLocation == null) ? null : newLocation.getPath();
+ LOG.debug("Verifying if Locations match {} : Old - {}, New - {}",
+ locationType.name(), oldLocationPath, newLocationPath);
+ return StringUtils.isBlank(oldLocationPath) && StringUtils.isBlank(newLocationPath)
+ || StringUtils.isNotBlank(oldLocationPath) && oldLocationPath.equalsIgnoreCase(newLocationPath);
+ }
+
+ public static boolean matchProperties(final Cluster oldEntity, final Cluster newEntity) {
+ Map<String, String> oldProps = getClusterProperties(oldEntity);
+ Map<String, String> newProps = getClusterProperties(newEntity);
+ return oldProps.equals(newProps);
+ }
+
+ private static Map<String, String> getClusterProperties(final Cluster cluster) {
+ Map<String, String> returnProps = new HashMap<String, String>();
+ if (cluster.getProperties() != null) {
+ for (Property prop : cluster.getProperties().getProperties()) {
+ returnProps.put(prop.getName(), prop.getValue());
+ }
+ }
+ return returnProps;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
index e4ca91b..a141e43 100644
--- a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
+++ b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
@@ -79,7 +79,8 @@ public final class ColoClusterRelation implements ConfigurationChangeListener {
if (oldEntity.getEntityType() != EntityType.CLUSTER) {
return;
}
- throw new FalconException("change shouldn't be supported on cluster!");
+ onRemove(oldEntity);
+ onAdd(newEntity);
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index b181ece..51172f2 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -35,6 +35,7 @@ import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.datasource.DatasourceType;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.ClusterType;
@@ -130,6 +131,7 @@ public final class EntityUtil {
public enum ENTITY_OPERATION {
SUBMIT,
UPDATE,
+ UPDATE_CLUSTER_DEPENDENTS,
SCHEDULE,
SUBMIT_AND_SCHEDULE,
DELETE,
@@ -706,6 +708,40 @@ public final class EntityUtil {
}
}
+ public static Integer getVersion(final Entity entity) throws FalconException {
+ switch (entity.getEntityType()) {
+ case FEED:
+ return ((Feed)entity).getVersion();
+ case PROCESS:
+ return ((Process)entity).getVersion();
+ case CLUSTER:
+ return ((Cluster)entity).getVersion();
+ case DATASOURCE:
+ return ((Datasource)entity).getVersion();
+ default:
+ throw new FalconException("Invalid entity type:" + entity.getEntityType());
+ }
+ }
+
+ public static void setVersion(Entity entity, final Integer version) throws FalconException {
+ switch (entity.getEntityType()) {
+ case FEED:
+ ((Feed)entity).setVersion(version);
+ break;
+ case PROCESS:
+ ((Process)entity).setVersion(version);
+ break;
+ case CLUSTER:
+ ((Cluster)entity).setVersion(version);
+ break;
+ case DATASOURCE:
+ ((Datasource)entity).setVersion(version);
+ break;
+ default:
+ throw new FalconException("Invalid entity type:" + entity.getEntityType());
+ }
+ }
+
//Staging path that stores scheduler configs like oozie coord/bundle xmls, parent workflow xml
//Each entity update creates a new staging path
//Base staging path is the base path for all staging dirs
@@ -1123,4 +1159,37 @@ public final class EntityUtil {
return instancePath;
}
+ /**
+ * Returns true if entity is dependent on cluster, else returns false.
+ * @param entity
+ * @param clusterName
+ * @return
+ */
+ public static boolean isEntityDependentOnCluster(Entity entity, String clusterName) {
+ switch (entity.getEntityType()) {
+ case CLUSTER:
+ return entity.getName().equalsIgnoreCase(clusterName);
+
+ case FEED:
+ Feed feed = (Feed) entity;
+ for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+ if (cluster.getName().equalsIgnoreCase(clusterName)) {
+ return true;
+ }
+ }
+ break;
+
+ case PROCESS:
+ Process process = (Process) entity;
+ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+ if (cluster.getName().equalsIgnoreCase(clusterName)) {
+ return true;
+ }
+ }
+ break;
+ default:
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 2a9a852..28fdaf8 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -97,6 +97,14 @@ public class FeedEntityParser extends EntityParser<Feed> {
cluster.getValidity().setEnd(DateUtil.NEVER);
}
+ // set Cluster version
+ int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
+ if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
+ throw new ValidationException("Feed should not set cluster to a version that does not exist");
+ } else {
+ cluster.setVersion(clusterVersion);
+ }
+
validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(),
cluster.getName());
validateClusterHasRegistry(feed, cluster);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index 16fd8b3..8edec5b 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -91,6 +91,14 @@ public class ProcessEntityParser extends EntityParser<Process> {
cluster.getValidity().setEnd(DateUtil.NEVER);
}
+ // set Cluster version
+ int clusterVersion = ClusterHelper.getCluster(cluster.getName()).getVersion();
+ if (cluster.getVersion() > 0 && cluster.getVersion() > clusterVersion) {
+ throw new ValidationException("Process should not set cluster to a version that does not exist");
+ } else {
+ cluster.setVersion(clusterVersion);
+ }
+
validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
validateHDFSPaths(process, clusterName);
validateProperties(process);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index bdcd1af..7f2b172 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -20,12 +20,15 @@ package org.apache.falcon.entity.store;
import org.apache.commons.codec.CharEncoding;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.AccessControlList;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
+import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FileStatus;
@@ -242,9 +245,10 @@ public final class ConfigurationStore implements FalconService {
private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException {
try {
if (get(type, entity.getName()) != null) {
- persist(type, entity);
ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
Entity oldEntity = entityMap.get(entity.getName());
+ updateVersion(oldEntity, entity);
+ persist(type, entity);
onChange(oldEntity, entity);
entityMap.put(entity.getName(), entity);
} else {
@@ -256,6 +260,18 @@ public final class ConfigurationStore implements FalconService {
AUDIT.info(type + "/" + entity.getName() + " is replaced into config store");
}
+ private void updateVersion(Entity oldentity, Entity newEntity) throws FalconException {
+ // increase version number for cluster only if dependent feeds/process needs to be updated.
+ if (oldentity.getEntityType().equals(EntityType.CLUSTER)) {
+ if (UpdateHelper.isClusterEntityUpdated((Cluster) oldentity, (Cluster) newEntity)) {
+ EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1);
+ }
+ } else if (!EntityUtil.equals(oldentity, newEntity)) {
+ // Increase version for other entities if they actually changed.
+ EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity));
+ }
+ }
+
public synchronized void update(EntityType type, Entity entity) throws FalconException {
if (updatesInProgress.get() == entity) {
try {
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
index 25bbf0c..e6851df 100644
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
@@ -120,7 +120,7 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
EntityType entityType = oldEntity.getEntityType();
switch (entityType) {
case CLUSTER:
- // a cluster cannot be updated
+ updateClusterEntity((Cluster) oldEntity, (Cluster) newEntity);
break;
case PROCESS:
updateProcessEntity((Process) oldEntity, (Process) newEntity);
@@ -133,7 +133,33 @@ public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
}
+ private void updateClusterEntity(Cluster oldCluster, Cluster newCluster) {
+ LOG.info("Updating Cluster entity: {}", newCluster.getName());
+ Vertex clusterEntityVertex = findVertex(oldCluster.getName(), RelationshipType.CLUSTER_ENTITY);
+ if (clusterEntityVertex == null) {
+ LOG.error("Illegal State: Cluster entity vertex must exist for {}", oldCluster.getName());
+ throw new IllegalStateException(oldCluster.getName() + " entity vertex must exist.");
+ }
+ updateColoEdge(oldCluster.getColo(), newCluster.getColo(), clusterEntityVertex);
+ updateDataClassification(oldCluster.getTags(), newCluster.getTags(), clusterEntityVertex);
+ }
+
+ private void updateColoEdge(String oldColo, String newColo, Vertex clusterEntityVertex) {
+ if (areSame(oldColo, newColo)) {
+ return;
+ }
+ Vertex oldColoVertex = findVertex(oldColo, RelationshipType.COLO);
+ if (oldColoVertex != null) {
+ removeEdge(clusterEntityVertex, oldColoVertex, RelationshipLabel.CLUSTER_COLO.getName());
+ }
+ Vertex newColoVertex = findVertex(newColo, RelationshipType.COLO);
+ if (newColoVertex == null) {
+ newColoVertex = addVertex(newColo, RelationshipType.COLO);
+ }
+
+ addEdge(clusterEntityVertex, newColoVertex, RelationshipLabel.CLUSTER_COLO.getName());
+ }
public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
LOG.info("Updating feed entity: {}", newFeed.getName());
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 6603bc6..ae88a01 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -20,12 +20,15 @@ package org.apache.falcon.update;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Process;
@@ -71,6 +74,10 @@ public final class UpdateHelper {
case PROCESS:
return !EntityUtil.equals(oldView, newView, PROCESS_FIELDS);
+ case CLUSTER:
+ return isClusterEntityUpdated((org.apache.falcon.entity.v0.cluster.Cluster) oldEntity,
+ (org.apache.falcon.entity.v0.cluster.Cluster) newEntity);
+
default:
}
throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType());
@@ -129,4 +136,34 @@ public final class UpdateHelper {
throw new FalconException("Don't know what to do. Unexpected scenario");
}
}
+
+ public static boolean isClusterEntityUpdated(final org.apache.falcon.entity.v0.cluster.Cluster oldEntity,
+ final org.apache.falcon.entity.v0.cluster.Cluster newEntity) {
+ /*
+ * Name should not be updated.
+ * interface, locations, properties, colo : Update bundle/coord for dependent entities.
+ * Description, tags, ACL : no need to update bundle/coord for dependent entities.
+ */
+ if (!oldEntity.getColo().equals(newEntity.getColo())) {
+ return true;
+ }
+
+ for(Interfacetype interfacetype : Interfacetype.values()) {
+ if (!ClusterHelper.matchInterface(oldEntity, newEntity, interfacetype)) {
+ return true;
+ }
+ }
+
+ for(ClusterLocationType locationType : ClusterLocationType.values()) {
+ if (!ClusterHelper.matchLocations(oldEntity, newEntity, locationType)) {
+ return true;
+ }
+ }
+
+ if (!ClusterHelper.matchProperties(oldEntity, newEntity)) {
+ return true;
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java b/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
index 0d6e754..2abcece 100644
--- a/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/ColoClusterRelationTest.java
@@ -69,5 +69,25 @@ public class ColoClusterRelationTest extends AbstractTestBase {
clusters = relation.getClusters("colo1");
Assert.assertNotNull(clusters);
Assert.assertEquals(0, clusters.size());
+
+ Cluster updatedCluster3 = new Cluster();
+ updatedCluster3.setName(cluster3.getName());
+ updatedCluster3.setColo("colo3");
+ try {
+ getStore().initiateUpdate(updatedCluster3);
+ getStore().update(EntityType.CLUSTER, updatedCluster3);
+ } finally {
+ getStore().cleanupUpdateInit();
+ }
+
+ relation = ColoClusterRelation.get();
+ clusters = relation.getClusters("colo3");
+ Assert.assertNotNull(clusters);
+ Assert.assertEquals(1, clusters.size());
+ Assert.assertTrue(clusters.contains(updatedCluster3.getName()));
+
+ clusters = relation.getClusters("colo2");
+ Assert.assertNotNull(clusters);
+ Assert.assertEquals(0, clusters.size());
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
index c87449c..766b2fa 100644
--- a/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/EntityUtilTest.java
@@ -450,4 +450,25 @@ public class EntityUtilTest extends AbstractTestBase {
// Ensure latest is returned.
Assert.assertEquals(EntityUtil.getLatestStagingPath(cluster, process).getName(), md5 + "_1436357052992");
}
+
+ @Test
+ public void testIsClusterUsedByEntity() throws Exception {
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(
+ getClass().getResourceAsStream(PROCESS_XML));
+ Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
+ getClass().getResourceAsStream(FEED_XML));
+ org.apache.falcon.entity.v0.cluster.Cluster cluster =
+ (org.apache.falcon.entity.v0.cluster.Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(
+ getClass().getResourceAsStream(CLUSTER_XML));
+
+ Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(cluster, "testCluster"));
+ Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(feed, "testCluster"));
+ Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(feed, "backupCluster"));
+ Assert.assertTrue(EntityUtil.isEntityDependentOnCluster(process, "testCluster"));
+
+ Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(cluster, "fakeCluster"));
+ Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(feed, "fakeCluster"));
+ Assert.assertFalse(EntityUtil.isEntityDependentOnCluster(process, "fakeCluster"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
index c45909f..4b4b657 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ClusterEntityParserTest.java
@@ -175,6 +175,9 @@ public class ClusterEntityParserTest extends AbstractTestBase {
// Good set of properties, should work
clusterEntityParser.validateProperties(cluster);
+ // validate version
+ Assert.assertEquals(cluster.getVersion(), 0);
+
// add duplicate property, should throw validation exception.
Property property1 = new Property();
property1.setName("field1");
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
index 3893917..6ade9c9 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/DatasourceEntityParserTest.java
@@ -76,6 +76,7 @@ public class DatasourceEntityParserTest extends AbstractTestBase {
Assert.assertEquals("test-hsql-db", databaseEntity.getName());
Assert.assertEquals("hsql", databaseEntity.getType().value());
Assert.assertEquals("org.hsqldb.jdbcDriver", databaseEntity.getDriver().getClazz());
+ Assert.assertEquals(datasource.getVersion(), 0);
}
@Test
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index ceec3c4..c642fb8 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -35,11 +35,11 @@ import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.Argument;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.ExtractMethod;
+import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.feed.MergeType;
-import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Partition;
import org.apache.falcon.entity.v0.feed.Partitions;
import org.apache.falcon.entity.v0.feed.Property;
@@ -87,11 +87,13 @@ public class FeedEntityParserTest extends AbstractTestBase {
Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
.getResourceAsStream(CLUSTER_XML));
cluster.setName("testCluster");
+ cluster.setVersion(0);
store.publish(EntityType.CLUSTER, cluster);
cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
.getResourceAsStream(CLUSTER_XML));
cluster.setName("backupCluster");
+ cluster.setVersion(1);
store.publish(EntityType.CLUSTER, cluster);
LifecyclePolicyMap.get().init();
@@ -123,11 +125,14 @@ public class FeedEntityParserTest extends AbstractTestBase {
assertEquals(feed.getSla().getSlaHigh().toString(), "hours(3)");
assertEquals(feed.getSla().getSlaLow().toString(), "hours(2)");
assertEquals(feed.getGroups(), "online,bi");
+ Assert.assertEquals(feed.getVersion(), 0);
assertEquals(feed.getClusters().getClusters().get(0).getName(),
"testCluster");
assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaLow().toString(), "hours(3)");
assertEquals(feed.getClusters().getClusters().get(0).getSla().getSlaHigh().toString(), "hours(4)");
+ assertEquals(feed.getClusters().getClusters().get(0).getVersion(), 0);
+ assertEquals(feed.getClusters().getClusters().get(1).getVersion(), 1);
assertEquals(feed.getClusters().getClusters().get(0).getType(),
ClusterType.SOURCE);
@@ -633,6 +638,7 @@ public class FeedEntityParserTest extends AbstractTestBase {
Cluster cluster = (Cluster) unmarshaller.unmarshal(this.getClass()
.getResourceAsStream(("/config/cluster/cluster-no-registry.xml")));
cluster.setName("badTestCluster");
+ cluster.setVersion(0);
ConfigurationStore.get().publish(EntityType.CLUSTER, cluster);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index 7159966..64f62a5 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -121,6 +121,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()), "2011-11-02T00:00Z");
Assert.assertEquals(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()), "2091-12-30T00:00Z");
Assert.assertEquals(process.getTimezone().getID(), "UTC");
+ Assert.assertEquals(processCluster.getVersion(), 0);
Assert.assertEquals(process.getSla().getShouldStartIn().toString(), "hours(2)");
Assert.assertEquals(process.getSla().getShouldEndIn().toString(), "hours(4)");
@@ -386,6 +387,17 @@ public class ProcessEntityParserTest extends AbstractTestBase {
}
@Test
+ public void testValidateVersion() throws Exception {
+ InputStream stream = this.getClass().getResourceAsStream(PROCESS_XML);
+
+ Process process = parser.parse(stream);
+ Assert.assertEquals(process.getVersion(), 0);
+ process.setVersion(10);
+ parser.validate(process);
+ Assert.assertEquals(process.getVersion(), 10);
+ }
+
+ @Test
public void testValidateACLWithACLAndAuthorizationDisabled() throws Exception {
InputStream stream = this.getClass().getResourceAsStream("/config/process/process-table.xml");
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
index fa3d3f4..8056e80 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
@@ -19,8 +19,10 @@
package org.apache.falcon.entity.store;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.util.StartupProperties;
@@ -96,6 +98,7 @@ public class ConfigurationStoreTest {
store.publish(EntityType.PROCESS, process);
Process p = store.get(EntityType.PROCESS, "hello");
Assert.assertEquals(p, process);
+ Assert.assertEquals(p.getVersion(), 0);
store.registerListener(listener);
process.setName("world");
@@ -109,6 +112,34 @@ public class ConfigurationStoreTest {
}
@Test
+ public void testUpdate() throws Exception {
+ Cluster cluster1 = createClusterObj();
+ store.publish(EntityType.CLUSTER, cluster1);
+ Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0);
+
+ Cluster cluster2 = createClusterObj();
+ cluster2.setDescription("new Desc");
+ store.initiateUpdate(cluster2);
+ store.update(EntityType.CLUSTER, cluster2);
+ store.cleanupUpdateInit();
+ Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 0);
+
+ Cluster cluster3 = createClusterObj();
+ cluster3.setColo("newColo");
+ store.initiateUpdate(cluster3);
+ store.update(EntityType.CLUSTER, cluster3);
+ store.cleanupUpdateInit();
+ Assert.assertEquals(EntityUtil.getVersion(store.get(EntityType.CLUSTER, "cluster1")).intValue(), 1);
+ }
+
+ private Cluster createClusterObj() {
+ Cluster cluster = new Cluster();
+ cluster.setName("cluster1");
+ cluster.setColo("colo1");
+ return cluster;
+ }
+
+ @Test
public void testGet() throws Exception {
Process p = store.get(EntityType.PROCESS, "notfound");
Assert.assertNull(p);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 29f933d..228f522 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -606,6 +606,29 @@ public class MetadataMappingServiceTest {
verifyLineageGraphForJobCounters(context);
}
+ @Test(dependsOnMethods = "testOnFeedEntityChange")
+ public void testOnClusterEntityChange() throws Exception {
+ long beforeVerticesCount = getVerticesCount(service.getGraph());
+ long beforeEdgesCount = getEdgesCount(service.getGraph());
+
+ Cluster oldCluster = clusterEntity;
+ Cluster newCluster = EntityBuilderTestUtil.buildCluster(oldCluster.getName(),
+ "clusterUpdateColo", oldCluster.getTags() + ",clusterUpdateTagKey=clusterUpdateTagVal");
+
+ try {
+ configStore.initiateUpdate(newCluster);
+ configStore.update(EntityType.CLUSTER, newCluster);
+ } finally {
+ configStore.cleanupUpdateInit();
+ }
+
+ Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 2); // +1 new tag +1 new colo
+ Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 1); // +1 new tag edge
+ Vertex newClusterVertex = getEntityVertex(newCluster.getName(), RelationshipType.CLUSTER_ENTITY);
+ verifyVertexForEdge(newClusterVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(),
+ "clusterUpdateColo", RelationshipType.COLO.getName());
+ }
+
private void verifyUpdatedEdges(Process newProcess) {
Vertex processVertex = getEntityVertex(newProcess.getName(), RelationshipType.PROCESS_ENTITY);
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
index 3e48e26..52b7103 100644
--- a/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/update/UpdateHelperTest.java
@@ -21,6 +21,7 @@ package org.apache.falcon.update;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.parser.EntityParserFactory;
@@ -30,7 +31,11 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.ACL;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
@@ -50,6 +55,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import javax.xml.bind.Unmarshaller;
import java.io.IOException;
import java.io.InputStream;
@@ -301,6 +307,64 @@ public class UpdateHelperTest extends AbstractTestBase {
Assert.assertTrue(UpdateHelper.isEntityUpdated(newProcess, newerProcess, cluster, procPath));
}
+ @Test
+ public void testIsClusterEntityUpdated() throws Exception {
+ Unmarshaller unmarshaller = EntityType.CLUSTER.getUnmarshaller();
+
+ String cluster = "testCluster";
+ Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+ Cluster newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+ newClusterEntity.setName(cluster);
+ Assert.assertNotNull(newClusterEntity);
+
+ // Tags, ACL, description update should not update bundle/workflow for dependent entities
+ ACL acl = new ACL();
+ acl.setOwner("Test");
+ acl.setGroup("testGroup");
+ acl.setPermission("*");
+ newClusterEntity.setACL(acl);
+ newClusterEntity.setDescription("New Description");
+ newClusterEntity.setTags("test=val,test2=val2");
+ Assert.assertFalse(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+ // Changing colo should trigger update
+ newClusterEntity.setColo("NewColoValue");
+ Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+
+ // Updating an interface should trigger update bundle/workflow for dependent entities
+ Interface writeInterface = ClusterHelper.getInterface(newClusterEntity, Interfacetype.WRITE);
+ newClusterEntity.getInterfaces().getInterfaces().remove(writeInterface);
+ Assert.assertNotNull(writeInterface);
+ writeInterface.setEndpoint("hdfs://test.host.name:8020");
+ writeInterface.setType(Interfacetype.WRITE);
+ writeInterface.setVersion("2.2.0");
+ newClusterEntity.getInterfaces().getInterfaces().add(writeInterface);
+ Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+ // Updating a property should trigger update bundle/workflow for dependent entities
+ newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+ newClusterEntity.setName(cluster);
+ Assert.assertNotNull(newClusterEntity);
+ org.apache.falcon.entity.v0.cluster.Property property = new org.apache.falcon.entity.v0.cluster.Property();
+ property.setName("testName");
+ property.setValue("testValue");
+ newClusterEntity.getProperties().getProperties().add(property);
+ Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+
+ // Updating a location should trigger update bundle/workflow for dependent entities
+ newClusterEntity = (Cluster) unmarshaller.unmarshal(this.getClass().getResource(CLUSTER_XML));
+ newClusterEntity.setName(cluster);
+ Assert.assertNotNull(newClusterEntity);
+ org.apache.falcon.entity.v0.cluster.Location stagingLocation =
+ ClusterHelper.getLocation(newClusterEntity, ClusterLocationType.STAGING);
+ Assert.assertNotNull(stagingLocation);
+ newClusterEntity.getInterfaces().getInterfaces().remove(stagingLocation);
+ stagingLocation.setPath("/test/path/here");
+ newClusterEntity.getLocations().getLocations().add(stagingLocation);
+ Assert.assertTrue(UpdateHelper.isClusterEntityUpdated(clusterEntity, newClusterEntity));
+ }
+
private static Location getLocation(Feed feed, LocationType type, String cluster) {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster);
if (feedCluster.getLocations() != null) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml
index 039208c..4ce7ad1 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -16,7 +16,7 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<process name="sample" xmlns="uri:falcon:process:0.1">
+<process name="sample" version="0" xmlns="uri:falcon:process:0.1">
<tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
<pipelines>testPipeline,dataReplication_Pipeline</pipelines>
<clusters>
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/docs/src/site/twiki/falconcli/UpdateEntity.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/falconcli/UpdateEntity.twiki b/docs/src/site/twiki/falconcli/UpdateEntity.twiki
index 5d49a76..146a60f 100644
--- a/docs/src/site/twiki/falconcli/UpdateEntity.twiki
+++ b/docs/src/site/twiki/falconcli/UpdateEntity.twiki
@@ -2,12 +2,15 @@
[[CommonCLI][Common CLI Options]]
-Update operation allows an already submitted/scheduled entity to be updated and put it into the archive.Archive path is defined in startup.properties in variable "config.store.uri". Cluster and datasource updates are currently not allowed.
+Update operation allows an already submitted/scheduled entity to be updated and put it into the archive.Archive path is defined in startup.properties in variable "config.store.uri". Datasource updates are currently not allowed.
Usage:
-$FALCON_HOME/bin/falcon entity -type [feed|process] -name <<name>> -update -file <<path_to_file>>
+$FALCON_HOME/bin/falcon entity -type [cluster|feed|process] -name <<name>> -update -file <<path_to_file>>
Optional Arg : -skipDryRun. When this argument is specified, Falcon skips oozie dryrun.
Example:
$FALCON_HOME/bin/falcon entity -type process -name hourly-reports-generator -update -file /process/definition.xml
+
+Note: When a cluster entity is updated, the dependent feed and process bundle+coordinators are updated in the
+workflow engine. Hence, only a falcon superuser who has ability to impersonate other users can update a cluster entity.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/docs/src/site/twiki/restapi/EntityUpdate.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityUpdate.twiki b/docs/src/site/twiki/restapi/EntityUpdate.twiki
index 46b01fc..cbf33db 100644
--- a/docs/src/site/twiki/restapi/EntityUpdate.twiki
+++ b/docs/src/site/twiki/restapi/EntityUpdate.twiki
@@ -8,8 +8,8 @@
Updates the submitted entity.
---++ Parameters
- * :entity-type can be feed or process.
- * :entity-name is name of the feed or process.
+ * :entity-type can be cluster, feed or process.
+ * :entity-name is name of the cluster, feed or process.
* skipDryRun : Optional query param, Falcon skips oozie dryrun when value is set to true.
* doAs <optional query param> allows the current user to impersonate the user passed in doAs when interacting with the Falcon system.
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index b319dd1..1f6be41 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -39,10 +39,14 @@ import org.apache.falcon.entity.v0.EntityGraph;
import org.apache.falcon.entity.v0.EntityIntegrityChecker;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.resource.APIResult.Status;
import org.apache.falcon.resource.EntityList.EntityElement;
import org.apache.falcon.resource.metadata.AbstractMetadataResource;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.security.DefaultAuthorizationProvider;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
@@ -50,6 +54,7 @@ import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -333,8 +338,8 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
obtainEntityLocks(oldEntity, "update", tokenList);
StringBuilder result = new StringBuilder("Updated successfully");
- //Update in workflow engine
- if (!DeploymentUtil.isPrism()) {
+ //Update in workflow engine if entity is not a cluster (cluster entity is not scheduled)
+ if (!DeploymentUtil.isPrism() && !entityType.equals(EntityType.CLUSTER)) {
Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
newClusters.retainAll(oldClusters); //common clusters for update
@@ -359,6 +364,80 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
}
}
+ /**
+ * Updates scheduled dependent entities of a cluster.
+ *
+ * @param clusterName Name of cluster
+ * @param colo colo
+ * @param skipDryRun Skip dry run during update if set to true
+ * @return APIResult
+ */
+ public APIResult updateClusterDependents(String clusterName, String colo, Boolean skipDryRun) {
+ checkColo(colo);
+ try {
+ Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
+ verifySafemodeOperation(cluster, EntityUtil.ENTITY_OPERATION.UPDATE_CLUSTER_DEPENDENTS);
+ int clusterVersion = cluster.getVersion();
+ StringBuilder result = new StringBuilder("Updating entities dependent on cluster \n");
+ // get dependent entities. check if cluster version changed. if yes, update dependent entities
+ Pair<String, EntityType>[] dependentEntities = EntityIntegrityChecker.referencedBy(cluster);
+ if (dependentEntities == null) {
+ // nothing to update
+ return new APIResult(APIResult.Status.SUCCEEDED, "Cluster "
+ + clusterName + " has no dependent entities");
+ }
+ for (Pair<String, EntityType> depEntity : dependentEntities) {
+ Entity entity = EntityUtil.getEntity(depEntity.second, depEntity.first);
+ switch (entity.getEntityType()) {
+ case FEED:
+ Clusters feedClusters = ((Feed)entity).getClusters();
+ List<org.apache.falcon.entity.v0.feed.Cluster> updatedFeedClusters =
+ new ArrayList<org.apache.falcon.entity.v0.feed.Cluster>();
+ if (feedClusters != null) {
+ for(org.apache.falcon.entity.v0.feed.Cluster feedCluster : feedClusters.getClusters()) {
+ if (feedCluster.getName().equals(clusterName)
+ && feedCluster.getVersion() != clusterVersion) {
+ // update feed cluster entity
+ feedCluster.setVersion(clusterVersion);
+ }
+ updatedFeedClusters.add(feedCluster);
+ }
+ ((Feed)entity).getClusters().getClusters().clear();
+ ((Feed)entity).getClusters().getClusters().addAll(updatedFeedClusters);
+ result.append(update(entity, entity.getEntityType().name(),
+ entity.getName(), skipDryRun).getMessage());
+ }
+ break;
+ case PROCESS:
+ org.apache.falcon.entity.v0.process.Clusters processClusters = ((Process)entity).getClusters();
+ List<org.apache.falcon.entity.v0.process.Cluster> updatedProcClusters =
+ new ArrayList<org.apache.falcon.entity.v0.process.Cluster>();
+ if (processClusters != null) {
+ for(org.apache.falcon.entity.v0.process.Cluster procCluster : processClusters.getClusters()) {
+ if (procCluster.getName().equals(clusterName)
+ && procCluster.getVersion() != clusterVersion) {
+ // update feed cluster entity
+ procCluster.setVersion(clusterVersion);
+ }
+ updatedProcClusters.add(procCluster);
+ }
+ ((Process)entity).getClusters().getClusters().clear();
+ ((Process)entity).getClusters().getClusters().addAll(updatedProcClusters);
+ result.append(update(entity, entity.getEntityType().name(),
+ entity.getName(), skipDryRun).getMessage());
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ return new APIResult(APIResult.Status.SUCCEEDED, result.toString());
+ } catch (FalconException e) {
+ LOG.error("Update failed", e);
+ throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
private void obtainEntityLocks(Entity entity, String command, List<Entity> tokenList)
throws FalconException {
//first obtain lock for the entity for which update is issued.
@@ -397,14 +476,19 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
}
- private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException {
+ private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException, IOException {
if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) {
throw new FalconException(
oldEntity.toShortString() + " can't be updated with " + newEntity.toShortString());
}
if (oldEntity.getEntityType() == EntityType.CLUSTER) {
- throw new FalconException("Update not supported for clusters");
+ final UserGroupInformation authenticatedUGI = CurrentUser.getAuthenticatedUGI();
+ DefaultAuthorizationProvider authorizationProvider = new DefaultAuthorizationProvider();
+ if (!authorizationProvider.isSuperUser(authenticatedUGI)) {
+ throw new FalconException("Permission denied : "
+ + "Cluster entity update can only be performed by superuser.");
+ }
}
String[] props = oldEntity.getEntityType().getImmutableProperties();
@@ -455,7 +539,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
if (entity.getEntityType().equals(EntityType.CLUSTER)) {
return;
} else {
- LOG.error("Entity operation {} is not allowed on non-cluster entities during safemode",
+ LOG.error("Entity operation {} is only allowed on cluster entities during safemode",
operation.name());
throw FalconWebException.newAPIException("Entity operation " + operation.name()
+ " is only allowed on cluster entities during safemode");
@@ -470,6 +554,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource {
return;
}
case SCHEDULE:
+ case UPDATE_CLUSTER_DEPENDENTS:
case SUBMIT_AND_SCHEDULE:
case DELETE:
case RESUME:
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 168f18e..53a9de1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -18,25 +18,6 @@
package org.apache.falcon.resource.proxy;
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
@@ -58,6 +39,24 @@ import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
import org.apache.falcon.util.DeploymentUtil;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
/**
* A proxy implementation of the schedulable entity operations.
*/
@@ -380,6 +379,55 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
}
/**
+ * Updates the dependent entities of a cluster in workflow engine.
+ * @param clusterName Name of cluster.
+ * @param ignore colo.
+ * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true.
+ * @return Result of the validation.
+ */
+ @POST
+ @Path("updateClusterDependents/{clusterName}")
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+ @Monitored(event = "updateClusterDependents")
+ @Override
+ public APIResult updateClusterDependents(
+ @Dimension("entityName") @PathParam("clusterName") final String clusterName,
+ @Dimension("colo") @QueryParam("colo") String ignore,
+ @QueryParam("skipDryRun") final Boolean skipDryRun) {
+
+ final Set<String> allColos = getApplicableColos("cluster", clusterName);
+ Map<String, APIResult> results = new HashMap<String, APIResult>();
+ boolean result = true;
+
+ if (!allColos.isEmpty()) {
+ results.put(FALCON_TAG + "/updateClusterDependents", new EntityProxy("cluster", clusterName) {
+ @Override
+ protected Set<String> getColosToApply() {
+ return allColos;
+ }
+
+ @Override
+ protected APIResult doExecute(String colo) throws FalconException {
+ return getConfigSyncChannel(colo).invoke("updateClusterDependents", clusterName,
+ colo, skipDryRun);
+ }
+ }.execute());
+ }
+
+ for (APIResult apiResult : results.values()) {
+ if (apiResult.getStatus() != APIResult.Status.SUCCEEDED) {
+ result = false;
+ }
+ }
+ // update only if all are updated
+ if (!embeddedMode && result) {
+ results.put(PRISM_TAG, super.updateClusterDependents(clusterName, currentColo, skipDryRun));
+ }
+
+ return consolidateResult(results, APIResult.class);
+ }
+
+ /**
* Force updates the entity.
* @param type Valid options are feed or process.
* @param entityName Name of the entity.
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 9b1ff2a..56dcf87 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -195,7 +195,6 @@ public class TestFalconUnit extends FalconUnitTestBase {
Process process = getEntity(EntityType.PROCESS, PROCESS_NAME);
setDummyProperty(process);
String processXml = process.toString();
-
File file = new File("target/newprocess.xml");
file.createNewFile();
FileWriter fw = new FileWriter(file.getAbsoluteFile());
@@ -208,9 +207,8 @@ public class TestFalconUnit extends FalconUnitTestBase {
result = falconUnitClient.touch(EntityType.PROCESS.name(), PROCESS_NAME, null, true, null);
assertStatus(result);
- process = getEntity(EntityType.PROCESS,
- PROCESS_NAME);
- Assert.assertEquals(process.toString(), processXml);
+ Process process2 = getEntity(EntityType.PROCESS, PROCESS_NAME);
+ Assert.assertEquals(process2.toString(), process.toString());
file.delete();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
index aa15dcc..7b32bd5 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/ConfigSyncService.java
@@ -87,4 +87,20 @@ public class ConfigSyncService extends AbstractEntityManager {
throw FalconWebException.newAPIException(throwable);
}
}
+
+ @POST
+ @Path("updateClusterDependents/{clusterName}")
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Monitored(event = "updateClusterDependents")
+ @Override
+ public APIResult updateClusterDependents(
+ @Dimension("entityName") @PathParam("clusterName") final String clusterName,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @QueryParam("skipDryRun") final Boolean skipDryRun) {
+ try {
+ return super.updateClusterDependents(clusterName, colo, skipDryRun);
+ } catch (Throwable throwable) {
+ throw FalconWebException.newAPIException(throwable);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index e97adff..657ef9e 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -18,8 +18,14 @@
package org.apache.falcon.resource;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconWebException;
+import org.apache.falcon.monitors.Dimension;
+import org.apache.falcon.monitors.Monitored;
+
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -27,16 +33,10 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
-import javax.ws.rs.DELETE;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconWebException;
-import org.apache.falcon.monitors.Dimension;
-import org.apache.falcon.monitors.Monitored;
-
/**
* Entity management operations as REST API for feed and process.
*/
@@ -79,6 +79,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
throw FalconWebException.newAPIException("delete on server is not"
+ " supported.Please run your operation on Prism.", Response.Status.FORBIDDEN);
}
+
/**
* Updates the submitted entity.
* @param request Servlet Request
@@ -103,6 +104,26 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
}
/**
+ * Updates the dependent entities of a cluster in workflow engine.
+ * @param clusterName Name of cluster.
+ * @param ignore colo.
+ * @param skipDryRun Optional query param, Falcon skips oozie dryrun when value is set to true.
+ * @return Result of the validation.
+ */
+ @POST
+ @Path("updateClusterDependents/{clusterName}")
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
+ @Monitored(event = "updateClusterDependents")
+ @Override
+ public APIResult updateClusterDependents(
+ @Dimension("entityName") @PathParam("clusterName") final String clusterName,
+ @Dimension("colo") @QueryParam("colo") String ignore,
+ @QueryParam("skipDryRun") final Boolean skipDryRun) {
+ throw FalconWebException.newAPIException("update on server is not"
+ + " supported.Please run your operation on Prism.", Response.Status.FORBIDDEN);
+ }
+
+ /**
* Submits and schedules an entity.
* @param request Servlet Request
* @param type Valid options are feed or process.
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
new file mode 100644
index 0000000..f5efa37
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconClusterUpdateCLIIT.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.falcon.resource.TestContext;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Map;
+
+/**
+ * Test for Falcon CLI.
+ */
+@Test(groups = {"exhaustive"})
+public class FalconClusterUpdateCLIIT {
+ private InMemoryWriter stream = new InMemoryWriter(System.out);
+ private TestContext context = new TestContext();
+ private Map<String, String> overlay;
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ context.prepare();
+ FalconCLI.OUT.set(stream);
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ clearSafemode();
+ context.deleteEntitiesFromStore();
+ }
+
+
+ public void testUpdateClusterCommands() throws Exception {
+
+ FalconCLI.OUT.set(stream);
+
+ String filePath;
+ overlay = context.getUniqueOverlay();
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+ context.setCluster(overlay.get("cluster"));
+ Assert.assertEquals(stream.buffer.toString().trim(),
+ "falcon/default/Submit successful (cluster) " + context.getClusterName());
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0);
+
+
+ // Update cluster here and test that it works
+
+ initSafemode();
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -update -type cluster -file "
+ + filePath + " -name " + overlay.get("cluster")), 0);
+ clearSafemode();
+
+ // Try to update dependent entities
+ Assert.assertEquals(executeWithURL("entity -updateClusterDependents -cluster "
+ + overlay.get("cluster") + " -skipDryRun "), 0);
+
+ // try to update cluster with wrong name, it should fail.
+ initSafemode();
+ overlay = context.getUniqueOverlay();
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_UPDATED_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -update -type cluster -file "
+ + filePath + " -name " + overlay.get("cluster")), -1);
+ clearSafemode();
+ }
+
+
+ private void initSafemode() throws Exception {
+ // Set safemode
+ Assert.assertEquals(new FalconCLI().run(("admin -setsafemode true -url "
+ + TestContext.BASE_URL).split("\\s")), 0);
+ }
+
+ private void clearSafemode() throws Exception {
+ Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url "
+ + TestContext.BASE_URL).split("\\s")), 0);
+ }
+
+ private int executeWithURL(String command) throws Exception {
+ FalconCLI.OUT.get().print("COMMAND IS "+command + " -url " + TestContext.BASE_URL + "\n");
+ return new FalconCLI()
+ .run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
+ }
+
+ private static class InMemoryWriter extends PrintStream {
+
+ private StringBuffer buffer = new StringBuffer();
+
+ public InMemoryWriter(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void println(String x) {
+ clear();
+ buffer.append(x);
+ super.println(x);
+ }
+
+ @SuppressWarnings("UnusedDeclaration")
+ public String getBuffer() {
+ return buffer.toString();
+ }
+
+ public void clear() {
+ buffer.delete(0, buffer.length());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
index f640a69..d2b62b2 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java
@@ -21,7 +21,6 @@ package org.apache.falcon.cli;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.TestContext;
import org.apache.falcon.util.FalconTestUtil;
-import org.apache.falcon.util.StartupProperties;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -84,8 +83,6 @@ public class FalconSafemodeCLIIT {
private void clearSafemode() throws Exception {
Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url "
+ TestContext.BASE_URL).split("\\s")), 0);
- Assert.assertEquals(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"),
- "false");
}
public void testEntityCommandsNotAllowedInSafeMode() throws Exception {
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index f84559f..3cf5c18 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -95,6 +95,7 @@ public class TestContext extends AbstractTestContext {
public static final String DATASOURCE_TEMPLATE3 = "/datasource-template3.xml";
public static final String DATASOURCE_TEMPLATE4 = "/datasource-template4.xml";
public static final String CLUSTER_TEMPLATE = "/cluster-template.xml";
+ public static final String CLUSTER_UPDATED_TEMPLATE = "/cluster-updated-template.xml";
public static final String PIG_PROCESS_TEMPLATE = "/pig-process-template.xml";
public static final String BASE_URL = "https://localhost:41443/falcon-webapp";
http://git-wip-us.apache.org/repos/asf/falcon/blob/f3ff8b27/webapp/src/test/resources/cluster-updated-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/cluster-updated-template.xml b/webapp/src/test/resources/cluster-updated-template.xml
new file mode 100644
index 0000000..f94e897
--- /dev/null
+++ b/webapp/src/test/resources/cluster-updated-template.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<cluster colo="##colo##" description="updated cluster" name="##cluster##" xmlns="uri:falcon:cluster:0.1">
+ <interfaces>
+ <interface type="readonly" endpoint="jail://global:00"
+ version="0.20.2"/>
+ <interface type="write" endpoint="jail://global:00"
+ version="0.20.2"/>
+ <interface type="execute" endpoint="localhost:41021" version="0.20.2"/>
+ <interface type="workflow" endpoint="http://localhost:41000/oozie/"
+ version="3.1"/>
+ <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
+ version="5.4.3"/>
+ <interface type="registry" endpoint="thrift://localhost:49083"
+ version="0.11.0"/>
+ </interfaces>
+ <locations>
+ <location name="staging" path="/projects/falcon/staging"/>
+ <location name="temp" path="/tmp"/>
+ <location name="working" path="/projects/falcon/working"/>
+ </locations>
+ <properties>
+ <property name="test1" value="value1"/>
+ </properties>
+</cluster>