You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:26:25 UTC

[39/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
deleted file mode 100644
index c942862..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ /dev/null
@@ -1,719 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.FileSystemStorage;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityGraph;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.ACL;
-import org.apache.falcon.entity.v0.feed.Extract;
-import org.apache.falcon.entity.v0.feed.ExtractMethod;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.MergeType;
-import org.apache.falcon.entity.v0.feed.Properties;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.group.FeedGroup;
-import org.apache.falcon.group.FeedGroupMap;
-import org.apache.falcon.service.LifecyclePolicyMap;
-import org.apache.falcon.util.DateUtil;
-import org.apache.falcon.util.HadoopQueueUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-/**
- * Parser that parses feed entity definition.
- */
-public class FeedEntityParser extends EntityParser<Feed> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FeedEntityParser.class);
-
-    public FeedEntityParser() {
-        super(EntityType.FEED);
-    }
-
-    @Override
-    public void validate(Feed feed) throws FalconException {
-        if (feed.getTimezone() == null) {
-            feed.setTimezone(TimeZone.getTimeZone("UTC"));
-        }
-
-        if (feed.getClusters() == null) {
-            throw new ValidationException("Feed should have at least one cluster");
-        }
-
-        validateLifecycle(feed);
-        validateACL(feed);
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            validateEntityExists(EntityType.CLUSTER, cluster.getName());
-
-            // Optinal end_date
-            if (cluster.getValidity().getEnd() == null) {
-                cluster.getValidity().setEnd(DateUtil.NEVER);
-            }
-
-            validateClusterValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd(),
-                    cluster.getName());
-            validateClusterHasRegistry(feed, cluster);
-            validateFeedCutOffPeriod(feed, cluster);
-            if (FeedHelper.isImportEnabled(cluster)) {
-                validateEntityExists(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
-                validateFeedExtractionType(feed, cluster);
-                validateFeedImportArgs(cluster);
-                validateFeedImportFieldExcludes(cluster);
-            }
-            if (FeedHelper.isExportEnabled(cluster)) {
-                validateEntityExists(EntityType.DATASOURCE, FeedHelper.getExportDatasourceName(cluster));
-                validateFeedExportArgs(cluster);
-                validateFeedExportFieldExcludes(cluster);
-            }
-        }
-
-        validateFeedStorage(feed);
-        validateFeedPath(feed);
-        validateFeedPartitionExpression(feed);
-        validateFeedGroups(feed);
-        validateFeedSLA(feed);
-        validateProperties(feed);
-        validateHadoopQueue(feed);
-
-        // Seems like a good enough entity object for a new one
-        // But is this an update ?
-
-        Feed oldFeed = ConfigurationStore.get().get(EntityType.FEED, feed.getName());
-        if (oldFeed == null) {
-            return; // Not an update case
-        }
-
-        // Is actually an update. Need to iterate over all the processes
-        // depending on this feed and see if they are valid with the new
-        // feed reference
-        EntityGraph graph = EntityGraph.get();
-        Set<Entity> referenced = graph.getDependents(oldFeed);
-        Set<Process> processes = findProcesses(referenced);
-        if (processes.isEmpty()) {
-            return;
-        }
-
-        ensureValidityFor(feed, processes);
-    }
-
-    private void validateLifecycle(Feed feed) throws FalconException {
-        LifecyclePolicyMap map = LifecyclePolicyMap.get();
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            if (FeedHelper.isLifecycleEnabled(feed, cluster.getName())) {
-                if (FeedHelper.getRetentionStage(feed, cluster.getName()) == null) {
-                    throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
-                            + cluster.getName());
-                }
-                validateRetentionFrequency(feed, cluster.getName());
-                for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
-                    map.get(policyName).validate(feed, cluster.getName());
-                }
-            }
-        }
-    }
-
-    private void validateRetentionFrequency(Feed feed, String clusterName) throws FalconException {
-        Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName);
-        Frequency feedFrequency = feed.getFrequency();
-        if (DateUtil.getFrequencyInMillis(retentionFrequency) < DateUtil.getFrequencyInMillis(feedFrequency)) {
-            throw new ValidationException("Retention can not be more frequent than data availability.");
-        }
-    }
-
-    private Set<Process> findProcesses(Set<Entity> referenced) {
-        Set<Process> processes = new HashSet<Process>();
-        for (Entity entity : referenced) {
-            if (entity.getEntityType() == EntityType.PROCESS) {
-                processes.add((Process) entity);
-            }
-        }
-        return processes;
-    }
-
-    private void validateFeedSLA(Feed feed) throws FalconException {
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            Sla clusterSla = FeedHelper.getSLA(cluster, feed);
-            if (clusterSla != null) {
-                Frequency slaLowExpression = clusterSla.getSlaLow();
-                ExpressionHelper evaluator = ExpressionHelper.get();
-                ExpressionHelper.setReferenceDate(new Date());
-                Date slaLow = new Date(evaluator.evaluate(slaLowExpression.toString(), Long.class));
-
-                Frequency slaHighExpression = clusterSla.getSlaHigh();
-                Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class));
-
-                if (slaLow.after(slaHigh)) {
-                    throw new ValidationException("slaLow of Feed: " + slaLowExpression
-                            + "is greater than slaHigh: " + slaHighExpression
-                            + " for cluster: " + cluster.getName()
-                    );
-                }
-
-                // test that slaHigh is less than retention
-                Frequency retentionExpression = cluster.getRetention().getLimit();
-                Date retention = new Date(evaluator.evaluate(retentionExpression.toString(), Long.class));
-                if (slaHigh.after(retention)) {
-                    throw new ValidationException("slaHigh of Feed: " + slaHighExpression
-                            + " is greater than retention of the feed: " + retentionExpression
-                            + " for cluster: " + cluster.getName()
-                    );
-                }
-
-
-            }
-        }
-    }
-
-    private void validateFeedGroups(Feed feed) throws FalconException {
-        String[] groupNames = feed.getGroups() != null ? feed.getGroups().split(",") : new String[]{};
-        final Storage storage = FeedHelper.createStorage(feed);
-        String defaultPath = storage.getUriTemplate(LocationType.DATA);
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            final String uriTemplate = FeedHelper.createStorage(cluster, feed).getUriTemplate(LocationType.DATA);
-            if (!FeedGroup.getDatePattern(uriTemplate).equals(
-                    FeedGroup.getDatePattern(defaultPath))) {
-                throw new ValidationException("Feeds default path pattern: "
-                        + storage.getUriTemplate(LocationType.DATA)
-                        + ", does not match with cluster: "
-                        + cluster.getName()
-                        + " path pattern: "
-                        + uriTemplate);
-            }
-        }
-        for (String groupName : groupNames) {
-            FeedGroup group = FeedGroupMap.get().getGroupsMapping().get(groupName);
-            if (group != null && !group.canContainFeed(feed)) {
-                throw new ValidationException(
-                        "Feed " + feed.getName() + "'s frequency: " + feed.getFrequency().toString()
-                                + ", path pattern: " + storage
-                                + " does not match with group: " + group.getName() + "'s frequency: "
-                                + group.getFrequency()
-                                + ", date pattern: " + group.getDatePattern());
-            }
-        }
-    }
-
-    private void ensureValidityFor(Feed newFeed, Set<Process> processes) throws FalconException {
-        for (Process process : processes) {
-            try {
-                ensureValidityFor(newFeed, process);
-            } catch (FalconException e) {
-                throw new ValidationException(
-                        "Process " + process.getName() + " is not compatible " + "with changes to feed "
-                                + newFeed.getName(), e);
-            }
-        }
-    }
-
-    private void ensureValidityFor(Feed newFeed, Process process) throws FalconException {
-        for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
-            String clusterName = cluster.getName();
-            if (process.getInputs() != null) {
-                for (Input input : process.getInputs().getInputs()) {
-                    if (!input.getFeed().equals(newFeed.getName())) {
-                        continue;
-                    }
-                    CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName);
-                    CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), newFeed, clusterName);
-                    CrossEntityValidations.validateInstanceRange(process, input, newFeed);
-
-                    validateInputPartition(newFeed, input);
-                }
-            }
-
-            if (process.getOutputs() != null) {
-                for (Output output : process.getOutputs().getOutputs()) {
-                    if (!output.getFeed().equals(newFeed.getName())) {
-                        continue;
-                    }
-                    CrossEntityValidations.validateFeedDefinedForCluster(newFeed, clusterName);
-                    CrossEntityValidations.validateInstance(process, output, newFeed);
-                }
-            }
-            LOG.debug("Verified and found {} to be valid for new definition of {}",
-                    process.getName(), newFeed.getName());
-        }
-    }
-
-    private void validateInputPartition(Feed newFeed, Input input) throws FalconException {
-        if (input.getPartition() == null) {
-            return;
-        }
-
-        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(newFeed);
-        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
-            CrossEntityValidations.validateInputPartition(input, newFeed);
-        } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
-            throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
-        }
-    }
-
-    private void validateClusterValidity(Date start, Date end, String clusterName) throws FalconException {
-        try {
-            if (start.after(end)) {
-                throw new ValidationException("Feed start time: " + start + " cannot be after feed end time: " + end
-                        + " for cluster: " + clusterName);
-            }
-        } catch (ValidationException e) {
-            throw new ValidationException(e);
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
-    private void validateFeedCutOffPeriod(Feed feed, Cluster cluster) throws FalconException {
-        ExpressionHelper evaluator = ExpressionHelper.get();
-
-        String feedRetention = cluster.getRetention().getLimit().toString();
-        long retentionPeriod = evaluator.evaluate(feedRetention, Long.class);
-
-        if (feed.getLateArrival() == null) {
-            LOG.debug("Feed's late arrival cut-off not set");
-            return;
-        }
-        String feedCutoff = feed.getLateArrival().getCutOff().toString();
-        long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class);
-
-        if (retentionPeriod < feedCutOffPeriod) {
-            throw new ValidationException(
-                    "Feed's retention limit: " + feedRetention + " of referenced cluster " + cluster.getName()
-                            + " should be more than feed's late arrival cut-off period: " + feedCutoff + " for feed: "
-                            + feed.getName());
-        }
-    }
-
-    private void validateFeedPartitionExpression(Feed feed) throws FalconException {
-        int numSourceClusters = 0, numTrgClusters = 0;
-        Set<String> clusters = new HashSet<String>();
-        for (Cluster cl : feed.getClusters().getClusters()) {
-            if (!clusters.add(cl.getName())) {
-                throw new ValidationException("Cluster: " + cl.getName()
-                        + " is defined more than once for feed: " + feed.getName());
-            }
-            if (cl.getType() == ClusterType.SOURCE) {
-                numSourceClusters++;
-            } else if (cl.getType() == ClusterType.TARGET) {
-                numTrgClusters++;
-            }
-        }
-
-        if (numTrgClusters >= 1 && numSourceClusters == 0) {
-            throw new ValidationException("Feed: " + feed.getName()
-                    + " should have atleast one source cluster defined");
-        }
-
-        int feedParts = feed.getPartitions() != null ? feed.getPartitions().getPartitions().size() : 0;
-
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-
-            if (cluster.getType() == ClusterType.SOURCE && numSourceClusters > 1 && numTrgClusters >= 1) {
-                String part = FeedHelper.normalizePartitionExpression(cluster.getPartition());
-                if (StringUtils.split(part, '/').length == 0) {
-                    throw new ValidationException(
-                            "Partition expression has to be specified for cluster " + cluster.getName()
-                                    + " as there are more than one source clusters");
-                }
-                validateClusterExpDefined(cluster);
-
-            } else if (cluster.getType() == ClusterType.TARGET) {
-
-                for (Cluster src : feed.getClusters().getClusters()) {
-                    if (src.getType() == ClusterType.SOURCE) {
-                        String part = FeedHelper.normalizePartitionExpression(src.getPartition(),
-                                cluster.getPartition());
-                        int numParts = StringUtils.split(part, '/').length;
-                        if (numParts > feedParts) {
-                            throw new ValidationException(
-                                    "Partition for " + src.getName() + " and " + cluster.getName()
-                                            + "clusters is more than the number of partitions defined in feed");
-                        }
-                    }
-                }
-
-                if (numTrgClusters > 1 && numSourceClusters >= 1) {
-                    validateClusterExpDefined(cluster);
-                }
-            }
-        }
-    }
-
-    private void validateClusterExpDefined(Cluster cl) throws FalconException {
-        if (cl.getPartition() == null) {
-            return;
-        }
-
-        org.apache.falcon.entity.v0.cluster.Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, cl.getName());
-        String part = FeedHelper.normalizePartitionExpression(cl.getPartition());
-        if (FeedHelper.evaluateClusterExp(cluster, part).equals(part)) {
-            throw new ValidationException(
-                    "Alteast one of the partition tags has to be a cluster expression for cluster " + cl.getName());
-        }
-    }
-
-    /**
-     * Ensure table is already defined in the catalog registry.
-     * Does not matter for FileSystem storage.
-     */
-    private void validateFeedStorage(Feed feed) throws FalconException {
-        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
-        validateMultipleSourcesExist(feed, baseFeedStorageType);
-        validateUniformStorageType(feed, baseFeedStorageType);
-        validatePartitions(feed, baseFeedStorageType);
-        validateStorageExists(feed);
-    }
-
-    private void validateMultipleSourcesExist(Feed feed, Storage.TYPE baseFeedStorageType) throws FalconException {
-        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
-            return;
-        }
-
-        // validate that there is only one source cluster
-        int numberOfSourceClusters = 0;
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            if (cluster.getType() == ClusterType.SOURCE) {
-                numberOfSourceClusters++;
-            }
-        }
-
-        if (numberOfSourceClusters > 1) {
-            throw new ValidationException("Multiple sources are not supported for feed with table storage: "
-                    + feed.getName());
-        }
-    }
-
-    private void validateUniformStorageType(Feed feed, Storage.TYPE feedStorageType) throws FalconException {
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
-
-            if (feedStorageType != feedClusterStorageType) {
-                throw new ValidationException("The storage type is not uniform for cluster: " + cluster.getName());
-            }
-        }
-    }
-
-    private void validateClusterHasRegistry(Feed feed, Cluster cluster) throws FalconException {
-        Storage.TYPE feedClusterStorageType = FeedHelper.getStorageType(feed, cluster);
-
-        if (feedClusterStorageType != Storage.TYPE.TABLE) {
-            return;
-        }
-
-        org.apache.falcon.entity.v0.cluster.Cluster clusterEntity = EntityUtil.getEntity(EntityType.CLUSTER,
-                cluster.getName());
-        if (ClusterHelper.getRegistryEndPoint(clusterEntity) == null) {
-            throw new ValidationException("Cluster should have registry interface defined: " + clusterEntity.getName());
-        }
-    }
-
-    private void validatePartitions(Feed feed, Storage.TYPE storageType) throws  FalconException {
-        if (storageType == Storage.TYPE.TABLE && feed.getPartitions() != null) {
-            throw new ValidationException("Partitions are not supported for feeds with table storage. "
-                    + "It should be defined as part of the table URI. "
-                    + feed.getName());
-        }
-    }
-
-    private void validateStorageExists(Feed feed) throws FalconException {
-        StringBuilder buffer = new StringBuilder();
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                    EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
-            if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
-                continue;
-            }
-
-            final Storage storage = FeedHelper.createStorage(cluster, feed);
-            // this is only true for table, filesystem always returns true
-            if (storage.getType() == Storage.TYPE.FILESYSTEM) {
-                continue;
-            }
-
-            CatalogStorage catalogStorage = (CatalogStorage) storage;
-            Configuration clusterConf = ClusterHelper.getConfiguration(clusterEntity);
-            if (!CatalogServiceFactory.getCatalogService().tableExists(
-                    clusterConf, catalogStorage.getCatalogUrl(),
-                    catalogStorage.getDatabase(), catalogStorage.getTable())) {
-                buffer.append("Table [")
-                        .append(catalogStorage.getTable())
-                        .append("] does not exist for feed: ")
-                        .append(feed.getName())
-                        .append(" in cluster: ")
-                        .append(cluster.getName());
-            }
-        }
-
-        if (buffer.length() > 0) {
-            throw new ValidationException(buffer.toString());
-        }
-    }
-
-    /**
-     * Validate ACL if authorization is enabled.
-     *
-     * @param feed Feed entity
-     * @throws ValidationException
-     */
-    protected void validateACL(Feed feed) throws FalconException {
-        if (isAuthorizationDisabled) {
-            return;
-        }
-
-        final ACL feedACL = feed.getACL();
-        validateACLOwnerAndGroup(feedACL);
-        try {
-            authorize(feed.getName(), feedACL);
-        } catch (AuthorizationException e) {
-            throw new ValidationException(e);
-        }
-
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                    EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
-            if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
-                continue;
-            }
-
-            final Storage storage = FeedHelper.createStorage(cluster, feed);
-            try {
-                storage.validateACL(feedACL);
-            } catch(FalconException e) {
-                throw new ValidationException(e);
-            }
-        }
-    }
-
-    /**
-     * Validate Hadoop cluster queue names specified in the Feed entity defintion.
-     *
-     * First tries to look for queue name specified in the Lifecycle, next queueName property
-     * and checks its validity against the Hadoop cluster scheduler info.
-     *
-     * Hadoop cluster queue is validated only if YARN RM webaddress is specified in the
-     * cluster entity properties.
-     *
-     * Throws exception if the specified queue name is not a valid hadoop cluster queue.
-     *
-     * @param feed
-     * @throws FalconException
-     */
-
-    protected void validateHadoopQueue(Feed feed) throws FalconException {
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            Set<String> feedQueue = getQueueNamesUsedInFeed(feed, cluster);
-
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                    EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
-
-            String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address");
-            if (StringUtils.isBlank(rmURL)) {
-                rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address");
-            }
-
-            if (StringUtils.isNotBlank(rmURL)) {
-                LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", cluster.getName(), rmURL);
-                Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
-
-                for (String q: feedQueue) {
-                    if (queueNames.contains(q)) {
-                        LOG.info("Validated presence of retention queue specified in feed - {}", q);
-                    } else {
-                        String strMsg = String.format("The hadoop queue name %s specified "
-                                + "for cluster %s is invalid.", q, cluster.getName());
-                        LOG.info(strMsg);
-                        throw new FalconException(strMsg);
-                    }
-                }
-            }
-        }
-    }
-
-    protected Set<String> getQueueNamesUsedInFeed(Feed feed, Cluster cluster) throws FalconException {
-        Set<String> queueList = new HashSet<>();
-        addToQueueList(FeedHelper.getRetentionQueue(feed, cluster), queueList);
-        if (cluster.getType() == ClusterType.TARGET) {
-            addToQueueList(FeedHelper.getReplicationQueue(feed, cluster), queueList);
-        }
-        return queueList;
-    }
-
-    private void addToQueueList(String queueName, Set<String> queueList) {
-        if (StringUtils.isBlank(queueName)) {
-            queueList.add(queueName);
-        }
-    }
-
-    protected void validateProperties(Feed feed) throws ValidationException {
-        Properties properties = feed.getProperties();
-        if (properties == null) {
-            return; // feed has no properties to validate.
-        }
-
-        List<Property> propertyList = feed.getProperties().getProperties();
-        HashSet<String> propertyKeys = new HashSet<String>();
-        for (Property prop : propertyList) {
-            if (StringUtils.isBlank(prop.getName())) {
-                throw new ValidationException("Property name and value cannot be empty for Feed : "
-                        + feed.getName());
-            }
-            if (!propertyKeys.add(prop.getName())) {
-                throw new ValidationException("Multiple properties with same name found for Feed : "
-                        + feed.getName());
-            }
-        }
-    }
-
-    /**
-     * Validate if FileSystem based feed contains location type data.
-     *
-     * @param feed Feed entity
-     * @throws FalconException
-     */
-    private void validateFeedPath(Feed feed) throws FalconException {
-        if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
-            return;
-        }
-
-        for (Cluster cluster : feed.getClusters().getClusters()) {
-            List<Location> locations = FeedHelper.getLocations(cluster, feed);
-            Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
-
-            if (dataLocation == null) {
-                throw new ValidationException(feed.getName() + " is a FileSystem based feed "
-                    + "but it doesn't contain location type - data in cluster " + cluster.getName());
-            }
-
-        }
-    }
-
-    /**
-     * Validate extraction and merge type combination. Currently supported combo:
-     *
-     * ExtractionType = FULL and MergeType = SNAPSHOT.
-     * ExtractionType = INCREMENTAL and MergeType = APPEND.
-     *
-     * @param feed Feed entity
-     * @param cluster Cluster referenced in the Feed definition
-     * @throws FalconException
-     */
-
-    private void validateFeedExtractionType(Feed feed, Cluster cluster) throws FalconException {
-        Extract extract = cluster.getImport().getSource().getExtract();
-
-        if (ExtractMethod.FULL == extract.getType())  {
-            if ((MergeType.SNAPSHOT != extract.getMergepolicy())
-                    || (extract.getDeltacolumn() != null)) {
-                throw new ValidationException(String.format("Feed %s is using FULL "
-                        + "extract method but specifies either a superfluous "
-                        + "deltacolumn  or a mergepolicy other than snapshot", feed.getName()));
-            }
-        }  else {
-            throw new ValidationException(String.format("Feed %s is using unsupported "
-                    + "extraction mechanism %s", feed.getName(), extract.getType().value()));
-        }
-    }
-
-    /**
-     * Validate improt arguments.
-     * @param feedCluster Cluster referenced in the feed
-     */
-    private void validateFeedImportArgs(Cluster feedCluster) throws FalconException {
-        Map<String, String> args = FeedHelper.getImportArguments(feedCluster);
-        validateSqoopArgs(args);
-    }
-
-    /**
-     * Validate sqoop arguments.
-     * @param args Map<String, String> arguments
-     */
-    private void validateSqoopArgs(Map<String, String> args) throws FalconException {
-        int numMappers = 1;
-        if (args.containsKey("--num-mappers")) {
-            numMappers = Integer.parseInt(args.get("--num-mappers"));
-        }
-        if ((numMappers > 1) && (!args.containsKey("--split-by"))) {
-            throw new ValidationException(String.format("Feed import expects "
-                    + "--split-by column when --num-mappers > 1"));
-        }
-    }
-
-    private void validateFeedImportFieldExcludes(Cluster feedCluster) throws FalconException {
-        if (FeedHelper.isFieldExcludes(feedCluster.getImport().getSource())) {
-            throw new ValidationException(String.format("Field excludes are not supported "
-                    + "currently in Feed import policy"));
-        }
-    }
-
-    /**
-     * Validate export arguments.
-     * @param feedCluster Cluster referenced in the feed
-     */
-    private void validateFeedExportArgs(Cluster feedCluster) throws FalconException {
-        Map<String, String> args = FeedHelper.getExportArguments(feedCluster);
-        Map<String, String> validArgs = new HashMap<>();
-        validArgs.put("--num-mappers", "");
-        validArgs.put("--update-key" , "");
-        validArgs.put("--input-null-string", "");
-        validArgs.put("--input-null-non-string", "");
-
-        for(Map.Entry<String, String> e : args.entrySet()) {
-            if (!validArgs.containsKey(e.getKey())) {
-                throw new ValidationException(String.format("Feed export argument %s is invalid.", e.getKey()));
-            }
-        }
-    }
-
-    private void validateFeedExportFieldExcludes(Cluster feedCluster) throws FalconException {
-        if (FeedHelper.isFieldExcludes(feedCluster.getExport().getTarget())) {
-            throw new ValidationException(String.format("Field excludes are not supported "
-                    + "currently in Feed import policy"));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
deleted file mode 100644
index 16fd8b3..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.process.Properties;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.ACL;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.util.DateUtil;
-import org.apache.falcon.util.HadoopQueueUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-/**
- * Concrete Parser which has XML parsing and validation logic for Process XML.
- */
-public class ProcessEntityParser extends EntityParser<Process> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ProcessEntityParser.class);
-
-    public ProcessEntityParser() {
-        super(EntityType.PROCESS);
-    }
-
-    @Override
-    public void validate(Process process) throws FalconException {
-        if (process.getTimezone() == null) {
-            process.setTimezone(TimeZone.getTimeZone("UTC"));
-        }
-
-        validateACL(process);
-        // check if dependent entities exists
-        Set<String> clusters = new HashSet<String>();
-        for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
-            String clusterName = cluster.getName();
-            if (!clusters.add(cluster.getName())) {
-                throw new ValidationException("Cluster: " + cluster.getName()
-                        + " is defined more than once for process: " + process.getName());
-            }
-            validateEntityExists(EntityType.CLUSTER, clusterName);
-
-            // Optinal end_date
-            if (cluster.getValidity().getEnd() == null) {
-                cluster.getValidity().setEnd(DateUtil.NEVER);
-            }
-
-            validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
-            validateHDFSPaths(process, clusterName);
-            validateProperties(process);
-
-            if (process.getInputs() != null) {
-                for (Input input : process.getInputs().getInputs()) {
-                    validateEntityExists(EntityType.FEED, input.getFeed());
-                    Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
-                    CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
-                    CrossEntityValidations.validateFeedRetentionPeriod(input.getStart(), feed, clusterName);
-                    CrossEntityValidations.validateInstanceRange(process, input, feed);
-                    validateInputPartition(input, feed);
-                    validateOptionalInputsForTableStorage(feed, input);
-                }
-            }
-
-            if (process.getOutputs() != null) {
-                for (Output output : process.getOutputs().getOutputs()) {
-                    validateEntityExists(EntityType.FEED, output.getFeed());
-                    Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
-                    CrossEntityValidations.validateFeedDefinedForCluster(feed, clusterName);
-                    CrossEntityValidations.validateInstance(process, output, feed);
-                }
-            }
-        }
-        validateDatasetName(process.getInputs(), process.getOutputs());
-        validateLateInputs(process);
-        validateProcessSLA(process);
-        validateHadoopQueue(process);
-    }
-
-
-    private void validateProcessSLA(Process process) throws FalconException {
-        if (process.getSla() != null) {
-            ExpressionHelper evaluator = ExpressionHelper.get();
-            ExpressionHelper.setReferenceDate(new Date());
-            Frequency shouldStartExpression = process.getSla().getShouldStartIn();
-            Frequency shouldEndExpression = process.getSla().getShouldEndIn();
-            Frequency timeoutExpression = process.getTimeout();
-
-            if (shouldStartExpression != null){
-                Date shouldStart = new Date(evaluator.evaluate(shouldStartExpression.toString(), Long.class));
-
-                if (shouldEndExpression != null) {
-                    Date shouldEnd = new Date(evaluator.evaluate(shouldEndExpression.toString(), Long.class));
-                    if (shouldStart.after(shouldEnd)) {
-                        throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression
-                                + "is greater than shouldEndIn: "
-                                + shouldEndExpression);
-                    }
-                }
-
-                if (timeoutExpression != null) {
-                    Date timeout = new Date(evaluator.evaluate(timeoutExpression.toString(), Long.class));
-                    if (timeout.before(shouldStart)) {
-                        throw new ValidationException("shouldStartIn of Process: " + shouldStartExpression
-                                + " is greater than timeout: " + process.getTimeout());
-                    }
-                }
-            }
-        }
-    }
-    /**
-     * Validate if the user submitting this entity has access to the specific dirs on HDFS.
-     *
-     * @param process process
-     * @param clusterName cluster the process is materialized on
-     * @throws FalconException
-     */
-    private void validateHDFSPaths(Process process, String clusterName) throws FalconException {
-        org.apache.falcon.entity.v0.cluster.Cluster cluster =
-                ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
-
-        if (!EntityUtil.responsibleFor(cluster.getColo())) {
-            return;
-        }
-
-        String workflowPath = process.getWorkflow().getPath();
-        String libPath = process.getWorkflow().getLib();
-        String nameNode = getNameNode(cluster);
-        try {
-            Configuration configuration = ClusterHelper.getConfiguration(cluster);
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(configuration);
-            if (!fs.exists(new Path(workflowPath))) {
-                throw new ValidationException(
-                        "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
-            }
-
-            if (StringUtils.isNotBlank(libPath)) {
-                String[] libPaths = libPath.split(EntityUtil.WF_LIB_SEPARATOR);
-                for (String path : libPaths) {
-                    if (!fs.exists(new Path(path))) {
-                        throw new ValidationException("Lib path: " + path + " does not exists in HDFS: " + nameNode);
-                    }
-                }
-            }
-        } catch (IOException e) {
-            throw new FalconException("Error validating workflow path " + workflowPath, e);
-        }
-    }
-
-    private String getNameNode(Cluster cluster) throws ValidationException {
-        // cluster should never be null as it is validated while submitting feeds.
-        if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
-            throw new ValidationException(
-                    "Cannot get valid nameNode scheme from write interface of cluster: " + cluster.getName());
-        }
-        return ClusterHelper.getStorageUrl(cluster);
-    }
-
-    private void validateProcessValidity(Date start, Date end) throws FalconException {
-        try {
-            if (!start.before(end)) {
-                throw new ValidationException(
-                        "Process start time: " + start + " should be before process end time: " + end);
-            }
-        } catch (ValidationException e) {
-            throw new ValidationException(e);
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
-    private void validateInputPartition(Input input, Feed feed) throws FalconException {
-        if (input.getPartition() == null) {
-            return;
-        }
-
-        final Storage.TYPE baseFeedStorageType = FeedHelper.getStorageType(feed);
-        if (baseFeedStorageType == Storage.TYPE.FILESYSTEM) {
-            CrossEntityValidations.validateInputPartition(input, feed);
-        } else if (baseFeedStorageType == Storage.TYPE.TABLE) {
-            throw new ValidationException("Input partitions are not supported for table storage: " + input.getName());
-        }
-    }
-
-    private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
-        Set<String> datasetNames = new HashSet<String>();
-        if (inputs != null) {
-            for (Input input : inputs.getInputs()) {
-                if (!datasetNames.add(input.getName())) {
-                    throw new ValidationException("Input name: " + input.getName() + " is already used");
-                }
-            }
-        }
-
-        if (outputs != null) {
-            for (Output output : outputs.getOutputs()) {
-                if (!datasetNames.add(output.getName())) {
-                    throw new ValidationException("Output name: " + output.getName() + " is already used");
-                }
-            }
-        }
-    }
-
-    private void validateLateInputs(Process process) throws ValidationException {
-        if (process.getLateProcess() == null) {
-            return;
-        }
-
-        Map<String, String> feeds = new HashMap<String, String>();
-        if (process.getInputs() != null) {
-            for (Input in : process.getInputs().getInputs()) {
-                feeds.put(in.getName(), in.getFeed());
-            }
-        }
-
-        for (LateInput lp : process.getLateProcess().getLateInputs()) {
-            if (!feeds.keySet().contains(lp.getInput())) {
-                throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
-            }
-
-            try {
-                Feed feed = ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
-                if (feed.getLateArrival() == null) {
-                    throw new ValidationException(
-                            "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
-                }
-            } catch (FalconException e) {
-                throw new ValidationException(e);
-            }
-        }
-    }
-
-    private void validateOptionalInputsForTableStorage(Feed feed, Input input) throws FalconException {
-        if (input.isOptional() && FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
-            throw new ValidationException("Optional Input is not supported for feeds with table storage! "
-                    + input.getName());
-        }
-    }
-
-    /**
-     * Validate ACL if authorization is enabled.
-     *
-     * @param process process entity
-     * @throws ValidationException
-     */
-    protected void validateACL(Process process) throws FalconException {
-        if (isAuthorizationDisabled) {
-            return;
-        }
-
-        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
-        ACL processACL = process.getACL();
-        if (processACL == null) {
-            throw new ValidationException("Process ACL cannot be empty for:  " + process.getName());
-        }
-
-        validateACLOwnerAndGroup(processACL);
-
-        try {
-            authorize(process.getName(), processACL);
-        } catch (AuthorizationException e) {
-            throw new ValidationException(e);
-        }
-    }
-
-    protected void validateProperties(Process process) throws ValidationException {
-        Properties properties = process.getProperties();
-        if (properties == null) {
-            return; // Cluster has no properties to validate.
-        }
-
-        List<Property> propertyList = process.getProperties().getProperties();
-        HashSet<String> propertyKeys = new HashSet<String>();
-        for (Property prop : propertyList) {
-            if (StringUtils.isBlank(prop.getName())) {
-                throw new ValidationException("Property name and value cannot be empty for Process : "
-                        + process.getName());
-            }
-            if (!propertyKeys.add(prop.getName())) {
-                throw new ValidationException("Multiple properties with same name found for Process : "
-                        + process.getName());
-            }
-        }
-    }
-
-    private void validateHadoopQueue(Process process) throws FalconException {
-        // get queue name specified in the process entity
-        String processQueueName = null;
-        java.util.Properties props = EntityUtil.getEntityProperties(process);
-        if ((props != null) && (props.containsKey(EntityUtil.MR_QUEUE_NAME))) {
-            processQueueName = props.getProperty(EntityUtil.MR_QUEUE_NAME);
-        } else {
-            return;
-        }
-
-        // iterate through each cluster in process entity to check if the cluster has the process entity queue
-        for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
-            String clusterName = cluster.getName();
-            org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
-                    ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
-
-            String rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.https.address");
-            if (rmURL == null) {
-                rmURL = ClusterHelper.getPropertyValue(clusterEntity, "yarn.resourcemanager.webapp.address");
-            }
-
-            if (rmURL != null) {
-                LOG.info("Fetching hadoop queue names from cluster {} RM URL {}", cluster.getName(), rmURL);
-                Set<String> queueNames = HadoopQueueUtil.getHadoopClusterQueueNames(rmURL);
-
-                if (queueNames.contains(processQueueName)) {
-                    LOG.info("Validated presence of queue {} specified in process "
-                            + "entity for cluster {}", processQueueName, clusterName);
-                } else {
-                    String strMsg = String.format("The hadoop queue name %s specified in process "
-                            + "entity for cluster %s is invalid.", processQueueName, cluster.getName());
-                    LOG.info(strMsg);
-                    throw new FalconException(strMsg);
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java b/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
deleted file mode 100644
index 98f1cb9..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.falcon.FalconException;
-
-/**
- * ValidationException during parsing.
- */
-public class ValidationException extends FalconException {
-
-    public ValidationException(String message) {
-        super(message);
-    }
-
-    public ValidationException(Exception e) {
-        super(e);
-    }
-
-    public ValidationException(String message, Exception e) {
-        super(message, e);
-    }
-
-    private static final long serialVersionUID = -4502166408759507355L;
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
deleted file mode 100644
index 9c7a932..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.commons.codec.CharEncoding;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Persistent store for falcon entities.
- */
-public final class ConfigurationStore implements FalconService {
-
-    private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[] {
-        EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE, };
-    public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[] { EntityType.PROCESS, EntityType.FEED,
-        EntityType.CLUSTER, };
-
-    private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
-    private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
-    private static final String UTF_8 = CharEncoding.UTF_8;
-    private final boolean shouldPersist;
-
-    private static final FsPermission STORE_PERMISSION =
-            new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
-    private Set<ConfigurationChangeListener> listeners = new LinkedHashSet<ConfigurationChangeListener>();
-
-    private ThreadLocal<Entity> updatesInProgress = new ThreadLocal<Entity>();
-
-    private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary
-        = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
-
-    private static final Entity NULL = new Entity() {
-        @Override
-        public String getName() {
-            return "NULL";
-        }
-
-        @Override
-        public String getTags() { return null; }
-
-        @Override
-        public AccessControlList getACL() {
-            return null;
-        }
-    };
-
-    private static final ConfigurationStore STORE = new ConfigurationStore();
-
-    public static ConfigurationStore get() {
-        return STORE;
-    }
-
-    private FileSystem fs;
-    private Path storePath;
-
-    private ConfigurationStore() {
-        for (EntityType type : EntityType.values()) {
-            dictionary.put(type, new ConcurrentHashMap<String, Entity>());
-        }
-
-        shouldPersist = Boolean.parseBoolean(StartupProperties.get().getProperty("config.store.persist", "true"));
-        if (shouldPersist) {
-            String uri = StartupProperties.get().getProperty("config.store.uri");
-            storePath = new Path(uri);
-            fs = initializeFileSystem();
-        }
-    }
-
-    /**
-     * Falcon owns this dir on HDFS which no one has permissions to read.
-     *
-     * @return FileSystem handle
-     */
-    private FileSystem initializeFileSystem() {
-        try {
-            FileSystem fileSystem =
-                    HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
-            if (!fileSystem.exists(storePath)) {
-                LOG.info("Creating configuration store directory: {}", storePath);
-                // set permissions so config store dir is owned by falcon alone
-                HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
-            }
-
-            return fileSystem;
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to bring up config store for path: " + storePath, e);
-        }
-    }
-
-    @Override
-    public void init() throws FalconException {
-        String listenerClassNames = StartupProperties.get().
-                getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph");
-        for (String listenerClassName : listenerClassNames.split(",")) {
-            listenerClassName = listenerClassName.trim();
-            if (listenerClassName.isEmpty()) {
-                continue;
-            }
-            ConfigurationChangeListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
-            registerListener(listener);
-        }
-
-        if (shouldPersist) {
-            for (final EntityType type : ENTITY_LOAD_ORDER) {
-                loadEntity(type);
-            }
-        }
-    }
-
-    private void loadEntity(final EntityType type) throws FalconException {
-        try {
-            final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
-            FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
-            if (files != null) {
-                final ExecutorService service = Executors.newFixedThreadPool(100);
-                for (final FileStatus file : files) {
-                    service.execute(new Runnable() {
-                        @Override
-                        public void run() {
-                            try {
-                                String fileName = file.getPath().getName();
-                                String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
-                                // ".xml"
-                                String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
-                                Entity entity = restore(type, entityName);
-                                entityMap.put(entityName, entity);
-                            } catch (IOException | FalconException e) {
-                                LOG.error("Unable to restore entity of", file);
-                            }
-                        }
-                    });
-                }
-                service.shutdown();
-                if (service.awaitTermination(10, TimeUnit.MINUTES)) {
-                    LOG.info("Restored Configurations for entity type: {} ", type.name());
-                } else {
-                    LOG.warn("Time out happened while waiting for all threads to finish while restoring entities "
-                            + "for type: {}", type.name());
-                }
-                // Checking if all entities were loaded
-                if (entityMap.size() != files.length) {
-                    throw new FalconException("Unable to restore configurations for entity type " + type.name());
-                }
-                for (Entity entity : entityMap.values()){
-                    onReload(entity);
-                }
-            }
-        } catch (IOException e) {
-            throw new FalconException("Unable to restore configurations", e);
-        } catch (InterruptedException e) {
-            throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + type.name());
-        }
-    }
-
-    public void registerListener(ConfigurationChangeListener listener) {
-        listeners.add(listener);
-    }
-
-    public void unregisterListener(ConfigurationChangeListener listener) {
-        listeners.remove(listener);
-    }
-
-    /**
-     * @param type   - EntityType that need to be published
-     * @param entity - Reference to the Entity Object
-     * @throws FalconException
-     */
-    public synchronized void publish(EntityType type, Entity entity) throws FalconException {
-        try {
-            if (get(type, entity.getName()) == null) {
-                persist(type, entity);
-                onAdd(entity);
-                dictionary.get(type).put(entity.getName(), entity);
-            } else {
-                throw new EntityAlreadyExistsException(
-                        entity.toShortString() + " already registered with configuration store. "
-                                + "Can't be submitted again. Try removing before submitting.");
-            }
-        } catch (IOException e) {
-            throw new StoreAccessException(e);
-        }
-        AUDIT.info(type + "/" + entity.getName() + " is published into config store");
-    }
-
-    private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException {
-        try {
-            if (get(type, entity.getName()) != null) {
-                persist(type, entity);
-                ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
-                Entity oldEntity = entityMap.get(entity.getName());
-                onChange(oldEntity, entity);
-                entityMap.put(entity.getName(), entity);
-            } else {
-                throw new FalconException(entity.toShortString() + " doesn't exist");
-            }
-        } catch (IOException e) {
-            throw new StoreAccessException(e);
-        }
-        AUDIT.info(type + "/" + entity.getName() + " is replaced into config store");
-    }
-
-    public synchronized void update(EntityType type, Entity entity) throws FalconException {
-        if (updatesInProgress.get() == entity) {
-            updateInternal(type, entity);
-        } else {
-            throw new FalconException(entity.toShortString() + " is not initialized for update");
-        }
-    }
-
-    private void onAdd(Entity entity) throws FalconException {
-        for (ConfigurationChangeListener listener : listeners) {
-            listener.onAdd(entity);
-        }
-    }
-
-    private void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
-        for (ConfigurationChangeListener listener : listeners) {
-            listener.onChange(oldEntity, newEntity);
-        }
-    }
-
-    private void onReload(Entity entity) throws FalconException {
-        for (ConfigurationChangeListener listener : listeners) {
-            listener.onReload(entity);
-        }
-    }
-
-    public synchronized void initiateUpdate(Entity entity) throws FalconException {
-        if (get(entity.getEntityType(), entity.getName()) == null || updatesInProgress.get() != null) {
-            throw new FalconException(
-                    "An update for " + entity.toShortString() + " is already in progress or doesn't exist");
-        }
-        updatesInProgress.set(entity);
-    }
-
-    /**
-     * @param type - Entity type that is being retrieved
-     * @param name - Name as it appears in the entity xml definition
-     * @param <T>  - Actual Entity object type
-     * @return - Entity object from internal dictionary, If the object is not
-     *         loaded in memory yet, it will retrieve it from persistent store
-     *         just in time. On startup all the entities will be added to the
-     *         dictionary with null reference.
-     * @throws FalconException
-     */
-    @SuppressWarnings("unchecked")
-    public <T extends Entity> T get(EntityType type, String name) throws FalconException {
-        ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
-        if (entityMap.containsKey(name)) {
-            if (updatesInProgress.get() != null && updatesInProgress.get().getEntityType() == type
-                    && updatesInProgress.get().getName().equals(name)) {
-                return (T) updatesInProgress.get();
-            }
-            T entity = (T) entityMap.get(name);
-            if (entity == NULL && shouldPersist) { // Object equality being checked
-                try {
-                    entity = this.restore(type, name);
-                } catch (IOException e) {
-                    throw new StoreAccessException(e);
-                }
-                entityMap.put(name, entity);
-                return entity;
-            } else {
-                return entity;
-            }
-        } else {
-            return null;
-        }
-    }
-
-    public Collection<String> getEntities(EntityType type) {
-        return Collections.unmodifiableCollection(dictionary.get(type).keySet());
-    }
-
-    /**
-     * Remove an entity which is already stored in the config store.
-     *
-     * @param type - Entity type being removed
-     * @param name - Name of the entity object being removed
-     * @return - True is remove is successful, false if request entity doesn't
-     *         exist
-     * @throws FalconException
-     */
-    public synchronized boolean remove(EntityType type, String name) throws FalconException {
-        Map<String, Entity> entityMap = dictionary.get(type);
-        if (entityMap.containsKey(name)) {
-            try {
-                archive(type, name);
-                Entity entity = entityMap.get(name);
-                onRemove(entity);
-                entityMap.remove(name);
-            } catch (IOException e) {
-                throw new StoreAccessException(e);
-            }
-            AUDIT.info(type + " " + name + " is removed from config store");
-            return true;
-        }
-        return false;
-    }
-
-    private void onRemove(Entity entity) throws FalconException {
-        for (ConfigurationChangeListener listener : listeners) {
-            listener.onRemove(entity);
-        }
-    }
-
-    /**
-     * @param type   - Entity type that is to be stored into persistent storage
-     * @param entity - entity to persist. JAXB Annotated entity will be marshalled
-     *               to the persistent store. The convention used for storing the
-     *               object:: PROP(config.store.uri)/{entitytype}/{entityname}.xml
-     * @throws java.io.IOException If any error in accessing the storage
-     * @throws FalconException
-     */
-    private void persist(EntityType type, Entity entity) throws IOException, FalconException {
-        if (!shouldPersist) {
-            return;
-        }
-        OutputStream out = fs
-                .create(new Path(storePath,
-                        type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml"));
-        try {
-            type.getMarshaller().marshal(entity, out);
-            LOG.info("Persisted configuration {}/{}", type, entity.getName());
-        } catch (JAXBException e) {
-            LOG.error("Unable to serialize the entity object {}/{}", type, entity.getName(), e);
-            throw new StoreAccessException("Unable to serialize the entity object " + type + "/" + entity.getName(), e);
-        } finally {
-            out.close();
-        }
-    }
-
-    /**
-     * Archive removed configuration in the persistent store.
-     *
-     * @param type - Entity type to archive
-     * @param name - name
-     * @throws IOException If any error in accessing the storage
-     */
-    private void archive(EntityType type, String name) throws IOException {
-        if (!shouldPersist) {
-            return;
-        }
-        Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
-        HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION);
-        fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),
-                new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
-        LOG.info("Archived configuration {}/{}", type, name);
-    }
-
-    /**
-     * @param type - Entity type to restore from persistent store
-     * @param name - Name of the entity to restore.
-     * @param <T>  - Actual entity object type
-     * @return - De-serialized entity object restored from persistent store
-     * @throws IOException     If any error in accessing the storage
-     * @throws FalconException
-     */
-    @SuppressWarnings("unchecked")
-    private synchronized <T extends Entity> T restore(EntityType type, String name)
-        throws IOException, FalconException {
-
-        InputStream in = fs.open(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"));
-        try {
-            return (T) type.getUnmarshaller().unmarshal(in);
-        } catch (JAXBException e) {
-            throw new StoreAccessException("Unable to un-marshall xml definition for " + type + "/" + name, e);
-        } finally {
-            in.close();
-            LOG.info("Restored configuration {}/{}", type, name);
-        }
-    }
-
-    public void cleanupUpdateInit() {
-        updatesInProgress.set(null);
-    }
-
-    @Override
-    public String getName() {
-        return this.getClass().getName();
-    }
-
-    @Override
-    public void destroy() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java b/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java
deleted file mode 100644
index 28c5ac0..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/EntityAlreadyExistsException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Exception to thrown when entity being sought for addition is already present in config store.
- */
-public class EntityAlreadyExistsException extends FalconException {
-
-    public EntityAlreadyExistsException(Exception e) {
-        super(e);
-    }
-
-    public EntityAlreadyExistsException(String message, Exception e) {
-        super(message, e);
-    }
-
-    public EntityAlreadyExistsException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
deleted file mode 100644
index a9b7617..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.resource.FeedLookupResult;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.falcon.util.FalconRadixUtils;
-import org.apache.falcon.util.RadixTree;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- *  A <Key, Value> Store to store FeedProperties against Feed's Locations.
- *
- * For example:
- * let's say a feed - <b>MyFeed</b>, is configured for two clusters - cluster1 and cluster2 and has data location as
- * below.
- * /projects/myprocess/data/${MONTH}-${DAY}-${HOUR}
- * /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR}
- *
- * then the key,value store will be like below
- * key1: /projects/myprocess/data/${MONTH}-${DAY}-${HOUR}
- * value1: [FeedProperties("cluster1", LocationType.DATA, "MyFeed"),
- *          FeedProperties("cluster2", LocationType.DATA, "MyFeed")
- *         ]
- *
- * key2: /projects/myprocess/meta/${MONTH}-${DAY}-${HOUR}
- * value2: [FeedProperties("cluster1", LocationType.META, "MyFeed"),
- *          FeedProperties("cluster2", LocationType.META, "MyFeed")
- *         ]
- *
- * It ensures that no two Feeds share the same location.
- * It can also be used for operations like:
- * <ul>
- *     <li>Find if a there is a feed which uses a given path as it's location.</li>
- *     <li>Find name of the feed, given it's location.</li>
- * </ul>
- */
-public final class FeedLocationStore implements ConfigurationChangeListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FeedLocationStore.class);
-    protected final FeedPathStore<FeedLookupResult.FeedProperties> store = new
-            RadixTree<FeedLookupResult.FeedProperties>();
-
-    private static FeedLocationStore instance = new FeedLocationStore();
-
-    private FeedLocationStore(){
-    }
-
-    public static FeedLocationStore get(){
-        return instance;
-    }
-
-    @Override
-    public void onAdd(Entity entity) throws FalconException {
-        if (entity.getEntityType() == EntityType.FEED){
-            Feed feed = (Feed) entity;
-            List<Cluster> clusters = feed.getClusters().getClusters();
-            for(Cluster cluster: clusters) {
-                if (DeploymentUtil.getCurrentClusters().contains(cluster.getName())) {
-                    List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed,
-                            cluster.getName()), feed);
-                    if (clusterSpecificLocations != null) {
-                        for (Location location : clusterSpecificLocations) {
-                            if (location != null && StringUtils.isNotBlank(location.getPath())) {
-                                FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(
-                                        feed.getName(), location.getType(), cluster.getName());
-                                store.insert(StringUtils.trim(location.getPath()), value);
-                                LOG.debug("Inserted location: {} for feed: {} and cluster: {}",
-                                        location.getPath(), feed.getName(), cluster.getName());
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Delete the key(path) from the store if the feed is deleted.
-     * @param entity entity object
-     * @throws FalconException
-     */
-    @Override
-    public void onRemove(Entity entity) throws FalconException {
-        if (entity.getEntityType() == EntityType.FEED){
-
-            Feed feed = (Feed) entity;
-            List<Cluster> clusters = feed.getClusters().getClusters();
-            for(Cluster cluster: clusters){
-                List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed,
-                        cluster.getName()), feed);
-                if (clusterSpecificLocations != null) {
-                    for(Location location: clusterSpecificLocations){
-                        if (location != null && StringUtils.isNotBlank(location.getPath())){
-                            FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(feed.getName(),
-                                    location.getType(), cluster.getName());
-                            LOG.debug("Delete called for location: {} for feed: {} and cluster: {}",
-                                    location.getPath(), feed.getName(), cluster.getName());
-                            store.delete(location.getPath(), value);
-                            LOG.debug("Deleted location: {} for feed: {} and cluster: {}",
-                                    location.getPath(), feed.getName(), cluster.getName());
-                        }
-                    }
-                }
-            }
-        }
-
-    }
-
-    /**
-     * Delete the old path and insert the new Path when the feed is updated.
-     * @param oldEntity old entity object
-     * @param newEntity updated entity object
-     * @throws FalconException if the new path already exists in the store.
-     */
-    @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
-        onRemove(oldEntity);
-        onAdd(newEntity);
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
-    }
-
-
-    public Collection<FeedLookupResult.FeedProperties> reverseLookup(String path) {
-        return store.find(path, new FalconRadixUtils.FeedRegexAlgorithm());
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
deleted file mode 100644
index 1be12fe..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.falcon.util.FalconRadixUtils;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import java.util.Collection;
-
-/**
- * A <Key, Value> Store to store values against Feed Locations.
- *
- * @param <T>
- */
-public interface FeedPathStore<T> {
-
-    void insert(@Nullable String key, @Nonnull T value);
-
-    int getSize();
-
-    @Nullable
-    Collection<T> find(@Nonnull String key, @Nonnull FalconRadixUtils.INodeAlgorithm algorithm);
-
-    @Nullable
-    Collection<T> find(@Nonnull String key);
-
-    boolean delete(@Nonnull String key, @Nonnull T value);
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
deleted file mode 100644
index 318dc2e..0000000
--- a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.store;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Exception when there in issue accessing the persistent store.
- */
-public class StoreAccessException extends FalconException {
-
-    /**
-     * @param e Exception
-     */
-    public StoreAccessException(String message, Exception e) {
-        super(message, e);
-    }
-
-    public StoreAccessException(Exception e) {
-        super(e);
-    }
-}