You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:26:25 UTC
[39/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
deleted file mode 100644
index c942862..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ /dev/null
@@ -1,719 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.FileSystemStorage;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityGraph;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.ACL;
-import org.apache.falcon.entity.v0.feed.Extract;
-import org.apache.falcon.entity.v0.feed.ExtractMethod;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.MergeType;
-import org.apache.falcon.entity.v0.feed.Properties;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.group.FeedGroup;
-import org.apache.falcon.group.FeedGroupMap;
-import org.apache.falcon.service.LifecyclePolicyMap;
-import org.apache.falcon.util.DateUtil;
-import org.apache.falcon.util.HadoopQueueUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-/**
- * Parser that parses feed entity definition.
- */
-public class FeedEntityParser extends EntityParser<Feed> {
-
- private static final Logger LOG = LoggerFactory.getLogger(FeedEntityParser.class);
-
- public FeedEntityParser() {
- super(EntityType.FEED);
- }
-
- @Override
- public void validate(Feed feed) throws FalconException {
- if (feed.getTimezone() == null) {
- feed.setTimezone(TimeZone.getTimeZone("UTC"));
- }
-
- if (feed.getClusters() == null) {
- throw new ValidationException("Feed should have at least one cluster");
- }
-
- validateLifecycle(feed);
- validateACL(feed);
- for (Cluster cluster : feed.getClusters().getClusters()) {
- validateEntityExists(EntityType.CLUSTER, cluster.getName());
-
- // Optinal end_date
- if (cluster.getValidity().getEnd() == null) {
- cluster.getValidity().setEnd(DateUtil.NEVER);
- }
-
- validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(),
- cluster.getName());
- validateClusterHasRegistry(feed, cluster);
- validateFeedCutOffPeriod(feed, cluster);
- if (FeedHelper.isImportEnabled(cluster)) {
- validateEntityExists(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
- validateFeedExtractionType(feed, cluster);
- validateFeedImportArgs(cluster);
- validateFeedImportFieldExcludes(cluster);
- }
- if (FeedHelper.isExportEnabled(cluster)) {
- validateEntityExists(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster));
- validateFeedExportArgs(cluster);
- validateFeedExportFieldExcludes(cluster);
- }
- }
-
- validateFeedStorage(feed);
- validateFeedPath(feed);
- validateFeedPartitionExpression(feed);
- validateFeedGroups(feed);
- validateFeedSLA(feed);
- validateProperties(feed);
- validateHadoopQueue(feed);
-
- // Seems like a good enough entity object for a new one
- // But is this an update ?
-
- Feed oldFeed = ConfigurationStore.get().get(EntityType.FEED, feed.getName());
- if (oldFeed == null) {
- return; // Not an update case
- }
-
- // Is actually an update. Need to iterate over all the processes
- // depending on this feed and see if they are valid with the new
- // feed reference
- EntityGraph graph = EntityGraph.get();
- Set<Entity> referenced = graph.getDependents(oldFeed);
- Set<Process> processes = findProcesses(referenced);
- if (processes.isEmpty()) {
- return;
- }
-
- ensureValidityFor(feed, processes);
- }
-
- private void validateLifecycle(Feed feed) throws FalconException {
- LifecyclePolicyMap map = LifecyclePolicyMap.get();
- for (Cluster cluster : feed.getClusters().getClusters()) {
- if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) {
- if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) {
- throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
- + cluster.getName());
- }
- validateRetentionFrequency(feed, cluster.getName());
- for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
- map.get(policyName).validate(feed, cluster.getName());
- }
- }
- }
- }
-
- private void validateRetentionFrequency(Feed feed, String clusterName) throws FalconException {
- Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName);
- Frequency feedFrequency = feed.getFrequency();
- if (DateUtil.getFrequencyInMillis(retentionFrequency) < DateUtil.getFrequencyInMillis(feedFrequency)) {
- throw new ValidationException("Retention can not be more frequent than data availability.");
- }
- }
-
- private Set<Process> findProcesses(Set<Entity> referenced) {
- Set<Process> processes = new HashSet<Process>();
- for (Entity entity : referenced) {
- if (entity.getEntityType() == EntityType.PROCESS) {
- processes.add((Process) entity);
- }
- }
- return processes;
- }
-
- private void validateFeedSLA(Feed feed) throws FalconException {
- for (Cluster cluster : feed.getClusters().getClusters()) {
- Sla clusterSla = FeedHelper.getSLA(cluster, feed);
- if (clusterSla != null) {
- Frequency slaLowExpression = clusterSla.getSlaLow();
- ExpressionHelper evaluator = ExpressionHelper.get();
- ExpressionHelper.setReferenceDate(new Date());
- Date slaLow = new Date(evaluator.evaluate(slaLowExpression.toString(), Long.class));
-
- Frequency slaHighExpression = clusterSla.getSlaHigh();
- Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class));
-
- if (slaLow.after(slaHigh)) {
- throw new ValidationException("slaLow of Feed: " + slaLowExpression
- + "is greater than slaHigh: " + slaHighExpression
- + " for cluster: " + cluster.getName()
- );
- }
-
- // test that slaHigh is less than retention
- Frequency retentionExpression = cluster.getRetention().getLimit();
- Date retention = new Date(evaluator.evaluate(retentionExpression.toString(), Long.class));
- if (slaHigh.after(retention)) {
- throw new ValidationException("slaHigh of Feed: " + slaHighExpression
- + " is greater than retention of the feed: " + retentionExpression
- + " for cluster: " + cluster.getName()
- );
- }
-
-
- }
- }
- }
-
- private void validateFeedGroups(Feed feed) throws FalconException {
- String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
- final Storage storage = FeedHelper.createStorage(feed);
- String defaultPath = storage.getUriTemplate(LocationType.DATA);
- for (Cluster cluster : feed.getClusters().getClusters()) {
- final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA);
- if (!FeedGroup.getDatePattern(uriTemplate).equals(
- FeedGroup.getDatePattern(defaultPath))) {
- throw new ValidationException("Feeds default path pattern: "
- + storage.getUriTemplate(LocationType.DATA)
- + ", does not match with cluster: "
- + cluster.getName()
- + " path pattern: "
- + uriTemplate);
- }
- }
- for (String groupName : groupNames) {
- FeedGroup group = FeedGroupMap.get().getGroupsMapping().get(groupName);
- if (group != null && !group.canContainFeed(feed)) {
- throw new ValidationException(
- "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
- + ", path pattern: " + storage
- + " does not match with group: " + group.getName() + "'s frequency: "
- + group.getFrequency()
- + ", date pattern: " + group.getDatePattern());
- }
- }
- }
-
- private void ensureValidityFor(Feed newFeed, Set<Process> processes) throws FalconException {
- for (Process process : processes) {
- try {
- ensureValidityFor(newFeed, process);
- } catch (FalconException e) {
- throw new ValidationException(
- "Process " + process.getName() + " is not compatible " + "with changes to feed "
- + newFeed.getName(), e);
- }
- }
- }
-
- private void ensureValidityFor(Feed newFeed, Process process) throws FalconException {
- for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
- String clusterName = cluster.getName();
- if (process.getInputs() != null) {
- for (Input input : process.getInputs().getInputs()) {
- if (!input.getFeed().equals(newFeed.getName())) {
- continue;
- }
- CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName);
- CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), newFeed, clusterName);
- CrossEntityValidations.validateInstanceRange(process, input, newFeed);
-
- validateInputPartition(newFeed, input);
- }
- }
-
- if (process.getOutputs() != null) {
- for (Output output : process.getOutputs().getOutputs()) {
- if (!output.getFeed().equals(newFeed.getName())) {
- continue;
- }
- CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName);
- CrossEntityValidations.validateInstance(process, output, newFeed);
- }
- }
- LOG.debug("Verified and found {} to be valid for new definition of {}",
- process.getName(), newFeed.getName());
- }
- }
-
- private void validateInputPartition(Feed newFeed, Input input) throws FalconException {
- if (input.getPartition() == null) {
- return;
- }
-
- final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(newFeed);
- if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
- CrossEntityValidations.validateInputPartition(input, newFeed);
- } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
- throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
- }
- }
-
- private void validateClusterValidity(Date start, Date end, String clusterName) throws FalconException {
- try {
- if (start.after(end)) {
- throw new ValidationException("Feed start time: " + start + " cannot be after feed end time: " + end
- + " for cluster: " + clusterName);
- }
- } catch (ValidationException e) {
- throw new ValidationException(e);
- } catch (Exception e) {
- throw new FalconException(e);
- }
- }
-
- private void validateFeedCutOffPeriod(Feed feed, Cluster cluster) throws FalconException {
- ExpressionHelper evaluator = ExpressionHelper.get();
-
- String feedRetention = cluster.getRetention().getLimit().toString();
- long retentionPeriod = evaluator.evaluate(feedRetention, Long.class);
-
- if (feed.getLateArrival() == null) {
- LOG.debug("Feed's late arrival cut-off not set");
- return;
- }
- String feedCutoff = feed.getLateArrival().getCutOff().toString();
- long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class);
-
- if (retentionPeriod < feedCutOffPeriod) {
- throw new ValidationException(
- "Feed's retention limit: " + feedRetention + " of referenced cluster " + cluster.getName()
- + " should be more than feed's late arrival cut-off period: " + feedCutoff + " for feed: "
- + feed.getName());
- }
- }
-
- private void validateFeedPartitionExpression(Feed feed) throws FalconException {
- int numSourceClusters = 0, numTrgClusters = 0;
- Set<String> clusters = new HashSet<String>();
- for (Cluster cl : feed.getClusters().getClusters()) {
- if (!clusters.add(cl.getName())) {
- throw new ValidationException("Cluster: " + cl.getName()
- + " is defined more than once for feed: " + feed.getName());
- }
- if (cl.getType() == ClusterType.SOURCE) {
- numSourceClusters++;
- } else if (cl.getType() == ClusterType.TARGET) {
- numTrgClusters++;
- }
- }
-
- if (numTrgClusters >= 1 && numSourceClusters == 0) {
- throw new ValidationException("Feed: " + feed.getName()
- + " should have atleast one source cluster defined");
- }
-
- int feedParts = feed.getPartitions() != null ? feed.getPartitions().getPartitions().size() : 0;
-
- for (Cluster cluster : feed.getClusters().getClusters()) {
-
- if (cluster.getType() == ClusterType.SOURCE && numSourceClusters > 1 && numTrgClusters >= 1) {
- String part = FeedHelper.normalizePartitionExpression(cluster.getPartition());
- if (StringUtils.split(part, '/').length == 0) {
- throw new ValidationException(
- "Partition expression has to be specified for cluster " + cluster.getName()
- + " as there are more than one source clusters");
- }
- validateClusterExpDefined(cluster);
-
- } else if (cluster.getType() == ClusterType.TARGET) {
-
- for (Cluster src : feed.getClusters().getClusters()) {
- if (src.getType() == ClusterType.SOURCE) {
- String part = FeedHelper.normalizePartitionExpression(src.getPartition(),
- cluster.getPartition());
- int numParts = StringUtils.split(part, '/').length;
- if (numParts > feedParts) {
- throw new ValidationException(
- "Partition for " + src.getName() + " and " + cluster.getName()
- + "clusters is more than the number of partitions defined in feed");
- }
- }
- }
-
- if (numTrgClusters > 1 && numSourceClusters >= 1) {
- validateClusterExpDefined(cluster);
- }
- }
- }
- }
-
- private void validateClusterExpDefined(Cluster cl) throws FalconException {
- if (cl.getPartition() == null) {
- return;
- }
-
- org.apache.falcon.entity.v0.cluster.Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, cl.getName());
- String part = FeedHelper.normalizePartitionExpression(cl.getPartition());
- if (FeedHelper.evaluateClusterExp(cluster, part).equals(part)) {
- throw new ValidationException(
- "Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
- }
- }
-
- /**
- * Ensure table is already defined in the catalog registry.
- * Does not matter for FileSystem storage.
- */
- private void validateFeedStorage(Feed feed) throws FalconException {
- final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
- validateMultipleSourcesExist(feed, baseFeedStorageType);
- validateUniformStorageType(feed, baseFeedStorageType);
- validatePartitions(feed, baseFeedStorageType);
- validateStorageExists(feed);
- }
-
- private void validateMultipleSourcesExist(Feed feed, Storage.TYPE baseFeedStorageType) throws FalconException {
- if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
- return;
- }
-
- // validate that there is only one source cluster
- int numberOfSourceClusters = 0;
- for (Cluster cluster : feed.getClusters().getClusters()) {
- if (cluster.getType() == ClusterType.SOURCE) {
- numberOfSourceClusters++;
- }
- }
-
- if (numberOfSourceClusters > 1) {
- throw new ValidationException("Multiple sources are not supported for feed with table storage: "
- + feed.getName());
- }
- }
-
- private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws FalconException {
- for (Cluster cluster : feed.getClusters().getClusters()) {
- Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
-
- if (feedStorageType != feedClusterStorageType) {
- throw new ValidationException("The storage type is not uniform for cluster: " + cluster.getName());
- }
- }
- }
-
- private void validateClusterHasRegistry(Feed feed, Cluster cluster) throws FalconException {
- Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
-
- if (feedClusterStorageType != Storage.TYPE.TABLE) {
- return;
- }
-
- org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = EntityUtil.getEntity(EntityType.CLUSTER,
- cluster.getName());
- if (ClusterHelper.getRegistryEndPoint(clusterEntity) == null) {
- throw new ValidationException("Cluster should have registry interface defined: " + clusterEntity.getName());
- }
- }
-
- private void validatePartitions(Feed feed, Storage.TYPE storageType) throws FalconException {
- if (storageType == Storage.TYPE.TABLE && feed.getPartitions() != null) {
- throw new ValidationException("Partitions are not supported for feeds with table storage. "
- + "It should be defined as part of the table URI. "
- + feed.getName());
- }
- }
-
- private void validateStorageExists(Feed feed) throws FalconException {
- StringBuilder buffer = new StringBuilder();
- for (Cluster cluster : feed.getClusters().getClusters()) {
- org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
- EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
- if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
- continue;
- }
-
- final Storage storage = FeedHelper.createStorage(cluster, feed);
- // this is only true for table, filesystem always returns true
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- continue;
- }
-
- CatalogStorage catalogStorage = (CatalogStorage) storage;
- Configuration clusterConf = ClusterHelper.getConfiguration(clusterEntity);
- if (!CatalogServiceFactory.getCatalogService().tableExists(
- clusterConf, catalogStorage.getCatalogUrl(),
- catalogStorage.getDatabase(), catalogStorage.getTable())) {
- buffer.append("Table [")
- .append(catalogStorage.getTable())
- .append("] does not exist for feed: ")
- .append(feed.getName())
- .append(" in cluster: ")
- .append(cluster.getName());
- }
- }
-
- if (buffer.length() > 0) {
- throw new ValidationException(buffer.toString());
- }
- }
-
- /**
- * Validate ACL if authorization is enabled.
- *
- * @param feed Feed entity
- * @throws ValidationException
- */
- protected void validateACL(Feed feed) throws FalconException {
- if (isAuthorizationDisabled) {
- return;
- }
-
- final ACL feedACL = feed.getACL();
- validateACLOwnerAndGroup(feedACL);
- try {
- authorize(feed.getName(), feedACL);
- } catch (AuthorizationException e) {
- throw new ValidationException(e);
- }
-
- for (Cluster cluster : feed.getClusters().getClusters()) {
- org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
- EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
- if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
- continue;
- }
-
- final Storage storage = FeedHelper.createStorage(cluster, feed);
- try {
- storage.validateACL(feedACL);
- } catch(FalconException e) {
- throw new ValidationException(e);
- }
- }
- }
-
- /**
- * Validate Hadoop cluster queue names specified in the Feed entity defintion.
- *
- * First tries to look for queue name specified in the Lifecycle, next queueName property
- * and checks its validity against the Hadoop cluster scheduler info.
- *
- * Hadoop cluster queue is validated only if YARN RM webaddress is specified in the
- * cluster entity properties.
- *
- * Throws exception if the specified queue name is not a valid hadoop cluster queue.
- *
- * @param feed
- * @throws FalconException
- */
-
- protected void validateHadoopQueue(Feed feed) throws FalconException {
- for (Cluster cluster : feed.getClusters().getClusters()) {
- Set<String> feedQueue = getQueueNamesUsedInFeed(feed, cluster);
-
- org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
- EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
-
- String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address");
- if (StringUtils.isBlank(rmURL)) {
- rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address");
- }
-
- if (StringUtils.isNotBlank(rmURL)) {
- LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", cluster.getName(), rmURL);
- Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
-
- for (String q: feedQueue) {
- if (queueNames.contains(q)) {
- LOG.info("Validated presence of retention queue specified in feed - {}", q);
- } else {
- String strMsg = String.format("The hadoop queue name %s specified "
- + "for cluster %s is invalid.", q, cluster.getName());
- LOG.info(strMsg);
- throw new FalconException(strMsg);
- }
- }
- }
- }
- }
-
- protected Set<String> getQueueNamesUsedInFeed(Feed feed, Cluster cluster) throws FalconException {
- Set<String> queueList = new HashSet<>();
- addToQueueList(FeedHelper.getRetentionQueue(feed, cluster), queueList);
- if (cluster.getType() == ClusterType.TARGET) {
- addToQueueList(FeedHelper.getReplicationQueue(feed, cluster), queueList);
- }
- return queueList;
- }
-
- private void addToQueueList(String queueName, Set<String> queueList) {
- if (StringUtils.isBlank(queueName)) {
- queueList.add(queueName);
- }
- }
-
- protected void validateProperties(Feed feed) throws ValidationException {
- Properties properties = feed.getProperties();
- if (properties == null) {
- return; // feed has no properties to validate.
- }
-
- List<Property> propertyList = feed.getProperties().getProperties();
- HashSet<String> propertyKeys = new HashSet<String>();
- for (Property prop : propertyList) {
- if (StringUtils.isBlank(prop.getName())) {
- throw new ValidationException("Property name and value cannot be empty for Feed : "
- + feed.getName());
- }
- if (!propertyKeys.add(prop.getName())) {
- throw new ValidationException("Multiple properties with same name found for Feed : "
- + feed.getName());
- }
- }
- }
-
- /**
- * Validate if FileSystem based feed contains location type data.
- *
- * @param feed Feed entity
- * @throws FalconException
- */
- private void validateFeedPath(Feed feed) throws FalconException {
- if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
- return;
- }
-
- for (Cluster cluster : feed.getClusters().getClusters()) {
- List<Location> locations = FeedHelper.getLocations(cluster, feed);
- Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
-
- if (dataLocation == null) {
- throw new ValidationException(feed.getName() + " is a FileSystem based feed "
- + "but it doesn't contain location type - data in cluster " + cluster.getName());
- }
-
- }
- }
-
- /**
- * Validate extraction and merge type combination. Currently supported combo:
- *
- * ExtractionType = FULL and MergeType = SNAPSHOT.
- * ExtractionType = INCREMENTAL and MergeType = APPEND.
- *
- * @param feed Feed entity
- * @param cluster Cluster referenced in the Feed definition
- * @throws FalconException
- */
-
- private void validateFeedExtractionType(Feed feed, Cluster cluster) throws FalconException {
- Extract extract = cluster.getImport().getSource().getExtract();
-
- if (ExtractMethod.FULL == extract.getType()) {
- if ((MergeType.SNAPSHOT != extract.getMergepolicy())
- || (extract.getDeltacolumn() != null)) {
- throw new ValidationException(String.format("Feed %s is using FULL "
- + "extract method but specifies either a superfluous "
- + "deltacolumn or a mergepolicy other than snapshot", feed.getName()));
- }
- } else {
- throw new ValidationException(String.format("Feed %s is using unsupported "
- + "extraction mechanism %s", feed.getName(), extract.getType().value()));
- }
- }
-
- /**
- * Validate improt arguments.
- * @param feedCluster Cluster referenced in the feed
- */
- private void validateFeedImportArgs(Cluster feedCluster) throws FalconException {
- Map<String, String> args = FeedHelper.getImportArguments(feedCluster);
- validateSqoopArgs(args);
- }
-
- /**
- * Validate sqoop arguments.
- * @param args Map<String, String> arguments
- */
- private void validateSqoopArgs(Map<String, String> args) throws FalconException {
- int numMappers = 1;
- if (args.containsKey("--num-mappers")) {
- numMappers = Integer.parseInt(args.get("--num-mappers"));
- }
- if ((numMappers > 1) && (!args.containsKey("--split-by"))) {
- throw new ValidationException(String.format("Feed import expects "
- + "--split-by column when --num-mappers > 1"));
- }
- }
-
- private void validateFeedImportFieldExcludes(Cluster feedCluster) throws FalconException {
- if (FeedHelper.isFieldExcludes(feedCluster.getImport().getSource())) {
- throw new ValidationException(String.format("Field excludes are not supported "
- + "currently in Feed import policy"));
- }
- }
-
- /**
- * Validate export arguments.
- * @param feedCluster Cluster referenced in the feed
- */
- private void validateFeedExportArgs(Cluster feedCluster) throws FalconException {
- Map<String, String> args = FeedHelper.getExportArguments(feedCluster);
- Map<String, String> validArgs = new HashMap<>();
- validArgs.put("--num-mappers", "");
- validArgs.put("--update-key" , "");
- validArgs.put("--input-null-string", "");
- validArgs.put("--input-null-non-string", "");
-
- for(Map.Entry<String, String> e : args.entrySet()) {
- if (!validArgs.containsKey(e.getKey())) {
- throw new ValidationException(String.format("Feed export argument %s is invalid.", e.getKey()));
- }
- }
- }
-
- private void validateFeedExportFieldExcludes(Cluster feedCluster) throws FalconException {
- if (FeedHelper.isFieldExcludes(feedCluster.getExport().getTarget())) {
- throw new ValidationException(String.format("Field excludes are not supported "
- + "currently in Feed import policy"));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
deleted file mode 100644
index 16fd8b3..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.process.Properties;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.ACL;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.util.DateUtil;
-import org.apache.falcon.util.HadoopQueueUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-/**
- * Concrete Parser which has XML parsing and validation logic for Process XML.
- */
-public class ProcessEntityParser extends EntityParser<Process> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ProcessEntityParser.class);
-
- public ProcessEntityParser() {
- super(EntityType.PROCESS);
- }
-
- @Override
- public void validate(Process process) throws FalconException {
- if (process.getTimezone() == null) {
- process.setTimezone(TimeZone.getTimeZone("UTC"));
- }
-
- validateACL(process);
- // check if dependent entities exists
- Set<String> clusters = new HashSet<String>();
- for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
- String clusterName = cluster.getName();
- if (!clusters.add(cluster.getName())) {
- throw new ValidationException("Cluster: " + cluster.getName()
- + " is defined more than once for process: " + process.getName());
- }
- validateEntityExists(EntityType.CLUSTER, clusterName);
-
- // Optinal end_date
- if (cluster.getValidity().getEnd() == null) {
- cluster.getValidity().setEnd(DateUtil.NEVER);
- }
-
- validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
- validateHDFSPaths(process, clusterName);
- validateProperties(process);
-
- if (process.getInputs() != null) {
- for (Input input : process.getInputs().getInputs()) {
- validateEntityExists(EntityType.FEED, input.getFeed());
- Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
- CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
- CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
- CrossEntityValidations.validateInstanceRange(process, input, feed);
- validateInputPartition(input, feed);
- validateOptionalInputsForTableStorage(feed, input);
- }
- }
-
- if (process.getOutputs() != null) {
- for (Output output : process.getOutputs().getOutputs()) {
- validateEntityExists(EntityType.FEED, output.getFeed());
- Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
- CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
- CrossEntityValidations.validateInstance(process, output, feed);
- }
- }
- }
- validateDatasetName(process.getInputs(), process.getOutputs());
- validateLateInputs(process);
- validateProcessSLA(process);
- validateHadoopQueue(process);
- }
-
-
- private void validateProcessSLA(Process process) throws FalconException {
- if (process.getSla() != null) {
- ExpressionHelper evaluator = ExpressionHelper.get();
- ExpressionHelper.setReferenceDate(new Date());
- Frequency shouldStartExpression = process.getSla().getShouldStartIn();
- Frequency shouldEndExpression = process.getSla().getShouldEndIn();
- Frequency timeoutExpression = process.getTimeout();
-
- if (shouldStartExpression != null){
- Date shouldStart = new Date(evaluator.evaluate(shouldStartExpression.toString(), Long.class));
-
- if (shouldEndExpression != null) {
- Date shouldEnd = new Date(evaluator.evaluate(shouldEndExpression.toString(), Long.class));
- if (shouldStart.after(shouldEnd)) {
- throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression
- + "is greater than shouldEndIn: "
- + shouldEndExpression);
- }
- }
-
- if (timeoutExpression != null) {
- Date timeout = new Date(evaluator.evaluate(timeoutExpression.toString(), Long.class));
- if (timeout.before(shouldStart)) {
- throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression
- + " is greater than timeout: " + process.getTimeout());
- }
- }
- }
- }
- }
- /**
- * Validate if the user submitting this entity has access to the specific dirs on HDFS.
- *
- * @param process process
- * @param clusterName cluster the process is materialized on
- * @throws FalconException
- */
- private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
- org.apache.falcon.entity.v0.cluster.Cluster cluster =
- ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
-
- if (!EntityUtil.responsibleFor(cluster.getColo())) {
- return;
- }
-
- String workflowPath = process.getWorkflow().getPath();
- String libPath = process.getWorkflow().getLib();
- String nameNode = getNameNode(cluster);
- try {
- Configuration configuration = ClusterHelper.getConfiguration(cluster);
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
- if (!fs.exists(new Path(workflowPath))) {
- throw new ValidationException(
- "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
- }
-
- if (StringUtils.isNotBlank(libPath)) {
- String[] libPaths = libPath.split(EntityUtil.WF_LIB_SEPARATOR);
- for (String path : libPaths) {
- if (!fs.exists(new Path(path))) {
- throw new ValidationException("Lib path: " + path + " does not exists in HDFS: " + nameNode);
- }
- }
- }
- } catch (IOException e) {
- throw new FalconException("Error validating workflow path " + workflowPath, e);
- }
- }
-
- private String getNameNode(Cluster cluster) throws ValidationException {
- // cluster should never be null as it is validated while submitting feeds.
- if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
- throw new ValidationException(
- "Cannot get valid nameNode scheme from write interface of cluster: " + cluster.getName());
- }
- return ClusterHelper.getStorageUrl(cluster);
- }
-
- private void validateProcessValidity(Date start, Date end) throws FalconException {
- try {
- if (!start.before(end)) {
- throw new ValidationException(
- "Process start time: " + start + " should be before process end time: " + end);
- }
- } catch (ValidationException e) {
- throw new ValidationException(e);
- } catch (Exception e) {
- throw new FalconException(e);
- }
- }
-
- private void validateInputPartition(Input input, Feed feed) throws FalconException {
- if (input.getPartition() == null) {
- return;
- }
-
- final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
- if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
- CrossEntityValidations.validateInputPartition(input, feed);
- } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
- throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
- }
- }
-
- private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
- Set<String> datasetNames = new HashSet<String>();
- if (inputs != null) {
- for (Input input : inputs.getInputs()) {
- if (!datasetNames.add(input.getName())) {
- throw new ValidationException("Input name: " + input.getName() + " is already used");
- }
- }
- }
-
- if (outputs != null) {
- for (Output output : outputs.getOutputs()) {
- if (!datasetNames.add(output.getName())) {
- throw new ValidationException("Output name: " + output.getName() + " is already used");
- }
- }
- }
- }
-
- private void validateLateInputs(Process process) throws ValidationException {
- if (process.getLateProcess() == null) {
- return;
- }
-
- Map<String, String> feeds = new HashMap<String, String>();
- if (process.getInputs() != null) {
- for (Input in : process.getInputs().getInputs()) {
- feeds.put(in.getName(), in.getFeed());
- }
- }
-
- for (LateInput lp : process.getLateProcess().getLateInputs()) {
- if (!feeds.keySet().contains(lp.getInput())) {
- throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
- }
-
- try {
- Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
- if (feed.getLateArrival() == null) {
- throw new ValidationException(
- "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
- }
- } catch (FalconException e) {
- throw new ValidationException(e);
- }
- }
- }
-
- private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException {
- if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
- throw new ValidationException("Optional Input is not supported for feeds with table storage! "
- + input.getName());
- }
- }
-
- /**
- * Validate ACL if authorization is enabled.
- *
- * @param process process entity
- * @throws ValidationException
- */
- protected void validateACL(Process process) throws FalconException {
- if (isAuthorizationDisabled) {
- return;
- }
-
- // Validate the entity owner is logged-in, authenticated user if authorization is enabled
- ACL processACL = process.getACL();
- if (processACL == null) {
- throw new ValidationException("Process ACL cannot be empty for: " + process.getName());
- }
-
- validateACLOwnerAndGroup(processACL);
-
- try {
- authorize(process.getName(), processACL);
- } catch (AuthorizationException e) {
- throw new ValidationException(e);
- }
- }
-
- protected void validateProperties(Process process) throws ValidationException {
- Properties properties = process.getProperties();
- if (properties == null) {
- return; // Cluster has no properties to validate.
- }
-
- List<Property> propertyList = process.getProperties().getProperties();
- HashSet<String> propertyKeys = new HashSet<String>();
- for (Property prop : propertyList) {
- if (StringUtils.isBlank(prop.getName())) {
- throw new ValidationException("Property name and value cannot be empty for Process : "
- + process.getName());
- }
- if (!propertyKeys.add(prop.getName())) {
- throw new ValidationException("Multiple properties with same name found for Process : "
- + process.getName());
- }
- }
- }
-
- private void validateHadoopQueue(Process process) throws FalconException {
- // get queue name specified in the process entity
- String processQueueName = null;
- java.util.Properties props = EntityUtil.getEntityProperties(process);
- if ((props != null) && (props.containsKey(EntityUtil.MR_QUEUE_NAME))) {
- processQueueName = props.getProperty(EntityUtil.MR_QUEUE_NAME);
- } else {
- return;
- }
-
- // iterate through each cluster in process entity to check if the cluster has the process entity queue
- for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
- String clusterName = cluster.getName();
- org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
- ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
-
- String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address");
- if (rmURL == null) {
- rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address");
- }
-
- if (rmURL != null) {
- LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", cluster.getName(), rmURL);
- Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
-
- if (queueNames.contains(processQueueName)) {
- LOG.info("Validated presence of queue {} specified in process "
- + "entity for cluster {}", processQueueName, clusterName);
- } else {
- String strMsg = String.format("The hadoop queue name %s specified in process "
- + "entity for cluster %s is invalid.", processQueueName, cluster.getName());
- LOG.info(strMsg);
- throw new FalconException(strMsg);
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java b/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
deleted file mode 100644
index 98f1cb9..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.falcon.FalconException;
-
-/**
- * ValidationException during parsing.
- */
-public class ValidationException extends FalconException {
-
- public ValidationException(String message) {
- super(message);
- }
-
- public ValidationException(Exception e) {
- super(e);
- }
-
- public ValidationException(String message, Exception e) {
- super(message, e);
- }
-
- private static final long serialVersionUID = -4502166408759507355L;
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
deleted file mode 100644
index 9c7a932..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.commons.codec.CharEncoding;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Persistent store for falcon entities.
- */
-public final class ConfigurationStore implements FalconService {
-
- private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] {
- EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE, };
- public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED,
- EntityType.CLUSTER, };
-
- private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
- private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
- private static final String UTF_8 = CharEncoding.UTF_8;
- private final boolean shouldPersist;
-
- private static final FsPermission STORE_PERMISSION =
- new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
- private Set<ConfigurationChangeListener> listeners = new LinkedHashSet<ConfigurationChangeListener>();
-
- private ThreadLocal<Entity> updatesInProgress = new ThreadLocal<Entity>();
-
- private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary
- = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
-
- private static final Entity NULL = new Entity() {
- @Override
- public String getName() {
- return "NULL";
- }
-
- @Override
- public String getTags() { return null; }
-
- @Override
- public AccessControlList getACL() {
- return null;
- }
- };
-
- private static final ConfigurationStore STORE = new ConfigurationStore();
-
- public static ConfigurationStore get() {
- return STORE;
- }
-
- private FileSystem fs;
- private Path storePath;
-
- private ConfigurationStore() {
- for (EntityType type : EntityType.values()) {
- dictionary.put(type, new ConcurrentHashMap<String, Entity>());
- }
-
- shouldPersist = Boolean.parseBoolean(StartupProperties.get().getProperty("config.store.persist", "true"));
- if (shouldPersist) {
- String uri = StartupProperties.get().getProperty("config.store.uri");
- storePath = new Path(uri);
- fs = initializeFileSystem();
- }
- }
-
- /**
- * Falcon owns this dir on HDFS which no one has permissions to read.
- *
- * @return FileSystem handle
- */
- private FileSystem initializeFileSystem() {
- try {
- FileSystem fileSystem =
- HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
- if (!fileSystem.exists(storePath)) {
- LOG.info("Creating configuration store directory: {}", storePath);
- // set permissions so config store dir is owned by falcon alone
- HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
- }
-
- return fileSystem;
- } catch (Exception e) {
- throw new RuntimeException("Unable to bring up config store for path: " + storePath, e);
- }
- }
-
- @Override
- public void init() throws FalconException {
- String listenerClassNames = StartupProperties.get().
- getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph");
- for (String listenerClassName : listenerClassNames.split(",")) {
- listenerClassName = listenerClassName.trim();
- if (listenerClassName.isEmpty()) {
- continue;
- }
- ConfigurationChangeListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
- registerListener(listener);
- }
-
- if (shouldPersist) {
- for (final EntityType type : ENTITY_LOAD_ORDER) {
- loadEntity(type);
- }
- }
- }
-
- private void loadEntity(final EntityType type) throws FalconException {
- try {
- final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
- FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
- if (files != null) {
- final ExecutorService service = Executors.newFixedThreadPool(100);
- for (final FileStatus file : files) {
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- String fileName = file.getPath().getName();
- String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
- // ".xml"
- String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
- Entity entity = restore(type, entityName);
- entityMap.put(entityName, entity);
- } catch (IOException | FalconException e) {
- LOG.error("Unable to restore entity of", file);
- }
- }
- });
- }
- service.shutdown();
- if (service.awaitTermination(10, TimeUnit.MINUTES)) {
- LOG.info("Restored Configurations for entity type: {} ", type.name());
- } else {
- LOG.warn("Time out happened while waiting for all threads to finish while restoring entities "
- + "for type: {}", type.name());
- }
- // Checking if all entities were loaded
- if (entityMap.size() != files.length) {
- throw new FalconException("Unable to restore configurations for entity type " + type.name());
- }
- for (Entity entity : entityMap.values()){
- onReload(entity);
- }
- }
- } catch (IOException e) {
- throw new FalconException("Unable to restore configurations", e);
- } catch (InterruptedException e) {
- throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + type.name());
- }
- }
-
- public void registerListener(ConfigurationChangeListener listener) {
- listeners.add(listener);
- }
-
- public void unregisterListener(ConfigurationChangeListener listener) {
- listeners.remove(listener);
- }
-
- /**
- * @param type - EntityType that need to be published
- * @param entity - Reference to the Entity Object
- * @throws FalconException
- */
- public synchronized void publish(EntityType type, Entity entity) throws FalconException {
- try {
- if (get(type, entity.getName()) == null) {
- persist(type, entity);
- onAdd(entity);
- dictionary.get(type).put(entity.getName(), entity);
- } else {
- throw new EntityAlreadyExistsException(
- entity.toShortString() + " already registered with configuration store. "
- + "Can't be submitted again. Try removing before submitting.");
- }
- } catch (IOException e) {
- throw new StoreAccessException(e);
- }
- AUDIT.info(type + "/" + entity.getName() + " is published into config store");
- }
-
- private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException {
- try {
- if (get(type, entity.getName()) != null) {
- persist(type, entity);
- ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
- Entity oldEntity = entityMap.get(entity.getName());
- onChange(oldEntity, entity);
- entityMap.put(entity.getName(), entity);
- } else {
- throw new FalconException(entity.toShortString() + " doesn't exist");
- }
- } catch (IOException e) {
- throw new StoreAccessException(e);
- }
- AUDIT.info(type + "/" + entity.getName() + " is replaced into config store");
- }
-
- public synchronized void update(EntityType type, Entity entity) throws FalconException {
- if (updatesInProgress.get() == entity) {
- updateInternal(type, entity);
- } else {
- throw new FalconException(entity.toShortString() + " is not initialized for update");
- }
- }
-
- private void onAdd(Entity entity) throws FalconException {
- for (ConfigurationChangeListener listener : listeners) {
- listener.onAdd(entity);
- }
- }
-
- private void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
- for (ConfigurationChangeListener listener : listeners) {
- listener.onChange(oldEntity, newEntity);
- }
- }
-
- private void onReload(Entity entity) throws FalconException {
- for (ConfigurationChangeListener listener : listeners) {
- listener.onReload(entity);
- }
- }
-
- public synchronized void initiateUpdate(Entity entity) throws FalconException {
- if (get(entity.getEntityType(), entity.getName()) == null || updatesInProgress.get() != null) {
- throw new FalconException(
- "An update for " + entity.toShortString() + " is already in progress or doesn't exist");
- }
- updatesInProgress.set(entity);
- }
-
- /**
- * @param type - Entity type that is being retrieved
- * @param name - Name as it appears in the entity xml definition
- * @param <T> - Actual Entity object type
- * @return - Entity object from internal dictionary, If the object is not
- * loaded in memory yet, it will retrieve it from persistent store
- * just in time. On startup all the entities will be added to the
- * dictionary with null reference.
- * @throws FalconException
- */
- @SuppressWarnings("unchecked")
- public <T extends Entity> T get(EntityType type, String name) throws FalconException {
- ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
- if (entityMap.containsKey(name)) {
- if (updatesInProgress.get() != null && updatesInProgress.get().getEntityType() == type
- && updatesInProgress.get().getName().equals(name)) {
- return (T) updatesInProgress.get();
- }
- T entity = (T) entityMap.get(name);
- if (entity == NULL && shouldPersist) { // Object equality being checked
- try {
- entity = this.restore(type, name);
- } catch (IOException e) {
- throw new StoreAccessException(e);
- }
- entityMap.put(name, entity);
- return entity;
- } else {
- return entity;
- }
- } else {
- return null;
- }
- }
-
- public Collection<String> getEntities(EntityType type) {
- return Collections.unmodifiableCollection(dictionary.get(type).keySet());
- }
-
- /**
- * Remove an entity which is already stored in the config store.
- *
- * @param type - Entity type being removed
- * @param name - Name of the entity object being removed
- * @return - True is remove is successful, false if request entity doesn't
- * exist
- * @throws FalconException
- */
- public synchronized boolean remove(EntityType type, String name) throws FalconException {
- Map<String, Entity> entityMap = dictionary.get(type);
- if (entityMap.containsKey(name)) {
- try {
- archive(type, name);
- Entity entity = entityMap.get(name);
- onRemove(entity);
- entityMap.remove(name);
- } catch (IOException e) {
- throw new StoreAccessException(e);
- }
- AUDIT.info(type + " " + name + " is removed from config store");
- return true;
- }
- return false;
- }
-
- private void onRemove(Entity entity) throws FalconException {
- for (ConfigurationChangeListener listener : listeners) {
- listener.onRemove(entity);
- }
- }
-
- /**
- * @param type - Entity type that is to be stored into persistent storage
- * @param entity - entity to persist. JAXB Annotated entity will be marshalled
- * to the persistent store. The convention used for storing the
- * object:: PROP(config.store.uri)/{entitytype}/{entityname}.xml
- * @throws java.io.IOException If any error in accessing the storage
- * @throws FalconException
- */
- private void persist(EntityType type, Entity entity) throws IOException, FalconException {
- if (!shouldPersist) {
- return;
- }
- OutputStream out = fs
- .create(new Path(storePath,
- type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml"));
- try {
- type.getMarshaller().marshal(entity, out);
- LOG.info("Persisted configuration {}/{}", type, entity.getName());
- } catch (JAXBException e) {
- LOG.error("Unable to serialize the entity object {}/{}", type, entity.getName(), e);
- throw new StoreAccessException("Unable to serialize the entity object " + type + "/" + entity.getName(), e);
- } finally {
- out.close();
- }
- }
-
- /**
- * Archive removed configuration in the persistent store.
- *
- * @param type - Entity type to archive
- * @param name - name
- * @throws IOException If any error in accessing the storage
- */
- private void archive(EntityType type, String name) throws IOException {
- if (!shouldPersist) {
- return;
- }
- Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
- HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION);
- fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),
- new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
- LOG.info("Archived configuration {}/{}", type, name);
- }
-
- /**
- * @param type - Entity type to restore from persistent store
- * @param name - Name of the entity to restore.
- * @param <T> - Actual entity object type
- * @return - De-serialized entity object restored from persistent store
- * @throws IOException If any error in accessing the storage
- * @throws FalconException
- */
- @SuppressWarnings("unchecked")
- private synchronized <T extends Entity> T restore(EntityType type, String name)
- throws IOException, FalconException {
-
- InputStream in = fs.open(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"));
- try {
- return (T) type.getUnmarshaller().unmarshal(in);
- } catch (JAXBException e) {
- throw new StoreAccessException("Unable to un-marshall xml definition for " + type + "/" + name, e);
- } finally {
- in.close();
- LOG.info("Restored configuration {}/{}", type, name);
- }
- }
-
- public void cleanupUpdateInit() {
- updatesInProgress.set(null);
- }
-
- @Override
- public String getName() {
- return this.getClass().getName();
- }
-
- @Override
- public void destroy() {
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java b/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java
deleted file mode 100644
index 28c5ac0..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Exception to thrown when entity being sought for addition is already present in config store.
- */
-public class EntityAlreadyExistsException extends FalconException {
-
- public EntityAlreadyExistsException(Exception e) {
- super(e);
- }
-
- public EntityAlreadyExistsException(String message, Exception e) {
- super(message, e);
- }
-
- public EntityAlreadyExistsException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
deleted file mode 100644
index a9b7617..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.resource.FeedLookupResult;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.falcon.util.FalconRadixUtils;
-import org.apache.falcon.util.RadixTree;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A <Key, Value> Store to store FeedProperties against Feed's Locations.
- *
- * For example:
- * let's say a feed - <b>MyFeed</b>, is configured for two clusters - cluster1 and cluster2 and has data location as
- * below.
- * /projects/myprocess/data/${MONTH}-${DAY}-${HOUR}
- * /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR}
- *
- * then the key,value store will be like below
- * key1: /projects/myprocess/data/${MONTH}-${DAY}-${HOUR}
- * value1: [FeedProperties("cluster1", LocationType.DATA, "MyFeed"),
- * FeedProperties("cluster2", LocationType.DATA, "MyFeed")
- * ]
- *
- * key2: /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR}
- * value2: [FeedProperties("cluster1", LocationType.META, "MyFeed"),
- * FeedProperties("cluster2", LocationType.META, "MyFeed")
- * ]
- *
- * It ensures that no two Feeds share the same location.
- * It can also be used for operations like:
- * <ul>
- * <li>Find if a there is a feed which uses a given path as it's location.</li>
- * <li>Find name of the feed, given it's location.</li>
- * </ul>
- */
-public final class FeedLocationStore implements ConfigurationChangeListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(FeedLocationStore.class);
- protected final FeedPathStore<FeedLookupResult.FeedProperties> store = new
- RadixTree<FeedLookupResult.FeedProperties>();
-
- private static FeedLocationStore instance = new FeedLocationStore();
-
- private FeedLocationStore(){
- }
-
- public static FeedLocationStore get(){
- return instance;
- }
-
- @Override
- public void onAdd(Entity entity) throws FalconException {
- if (entity.getEntityType() == EntityType.FEED){
- Feed feed = (Feed) entity;
- List<Cluster> clusters = feed.getClusters().getClusters();
- for(Cluster cluster: clusters) {
- if (DeploymentUtil.getCurrentClusters().contains(cluster.getName())) {
- List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed,
- cluster.getName()), feed);
- if (clusterSpecificLocations != null) {
- for (Location location : clusterSpecificLocations) {
- if (location != null && StringUtils.isNotBlank(location.getPath())) {
- FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(
- feed.getName(), location.getType(), cluster.getName());
- store.insert(StringUtils.trim(location.getPath()), value);
- LOG.debug("Inserted location: {} for feed: {} and cluster: {}",
- location.getPath(), feed.getName(), cluster.getName());
- }
- }
- }
- }
- }
- }
- }
-
- /**
- * Delete the key(path) from the store if the feed is deleted.
- * @param entity entity object
- * @throws FalconException
- */
- @Override
- public void onRemove(Entity entity) throws FalconException {
- if (entity.getEntityType() == EntityType.FEED){
-
- Feed feed = (Feed) entity;
- List<Cluster> clusters = feed.getClusters().getClusters();
- for(Cluster cluster: clusters){
- List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed,
- cluster.getName()), feed);
- if (clusterSpecificLocations != null) {
- for(Location location: clusterSpecificLocations){
- if (location != null && StringUtils.isNotBlank(location.getPath())){
- FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(feed.getName(),
- location.getType(), cluster.getName());
- LOG.debug("Delete called for location: {} for feed: {} and cluster: {}",
- location.getPath(), feed.getName(), cluster.getName());
- store.delete(location.getPath(), value);
- LOG.debug("Deleted location: {} for feed: {} and cluster: {}",
- location.getPath(), feed.getName(), cluster.getName());
- }
- }
- }
- }
- }
-
- }
-
- /**
- * Delete the old path and insert the new Path when the feed is updated.
- * @param oldEntity old entity object
- * @param newEntity updated entity object
- * @throws FalconException if the new path already exists in the store.
- */
- @Override
- public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
- onRemove(oldEntity);
- onAdd(newEntity);
- }
-
- @Override
- public void onReload(Entity entity) throws FalconException {
- onAdd(entity);
- }
-
-
- public Collection<FeedLookupResult.FeedProperties> reverseLookup(String path) {
- return store.find(path, new FalconRadixUtils.FeedRegexAlgorithm());
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
deleted file mode 100644
index 1be12fe..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.falcon.util.FalconRadixUtils;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import java.util.Collection;
-
-/**
- * A <Key, Value> Store to store values against Feed Locations.
- *
- * @param <T>
- */
-public interface FeedPathStore<T> {
-
- void insert(@Nullable String key, @Nonnull T value);
-
- int getSize();
-
- @Nullable
- Collection<T> find(@Nonnull String key, @Nonnull FalconRadixUtils.INodeAlgorithm algorithm);
-
- @Nullable
- Collection<T> find(@Nonnull String key);
-
- boolean delete(@Nonnull String key, @Nonnull T value);
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
deleted file mode 100644
index 318dc2e..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Exception when there in issue accessing the persistent store.
- */
-public class StoreAccessException extends FalconException {
-
- /**
- * @param e Exception
- */
- public StoreAccessException(String message, Exception e) {
- super(message, e);
- }
-
- public StoreAccessException(Exception e) {
- super(e);
- }
-}