You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:26:26 UTC

[40/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
deleted file mode 100644
index bbfca68..0000000
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Helper methods for accessing process members.
- */
-public final class ProcessHelper {
-
-    private ProcessHelper() {}
-
-
-    public static Cluster getCluster(Process process, String clusterName) {
-        for (Cluster cluster : process.getClusters().getClusters()) {
-            if (cluster.getName().equals(clusterName)) {
-                return cluster;
-            }
-        }
-        return null;
-    }
-
-    public static String getProcessWorkflowName(String workflowName, String processName) {
-        return StringUtils.isEmpty(workflowName) ? processName + "-workflow" : workflowName;
-    }
-
-    public static Storage.TYPE getStorageType(org.apache.falcon.entity.v0.cluster.Cluster cluster,
-                                              Process process) throws FalconException {
-        Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
-        if (process.getInputs() == null && process.getOutputs() == null) {
-            return storageType;
-        }
-
-        if (process.getInputs() != null) {
-            for (Input input : process.getInputs().getInputs()) {
-                Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
-                storageType = FeedHelper.getStorageType(feed, cluster);
-                if (Storage.TYPE.TABLE == storageType) {
-                    break;
-                }
-            }
-        }
-
-        // If input feeds storage type is file system check storage type of output feeds
-        if (process.getOutputs() != null && Storage.TYPE.FILESYSTEM == storageType) {
-            for (Output output : process.getOutputs().getOutputs()) {
-                Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-                storageType = FeedHelper.getStorageType(feed, cluster);
-                if (Storage.TYPE.TABLE == storageType) {
-                    break;
-                }
-            }
-        }
-
-        return storageType;
-    }
-
-    private static void validateProcessInstance(Process process, Date instanceTime,
-                                                org.apache.falcon.entity.v0.cluster.Cluster cluster) {
-        //validate the cluster
-        Cluster processCluster = getCluster(process, cluster.getName());
-        if (processCluster == null) {
-            throw new IllegalArgumentException("Cluster provided: " + cluster.getName()
-                    + " is not a valid cluster for the process: " + process.getName());
-        }
-
-        // check if instanceTime is in validity range
-        if (instanceTime.before(processCluster.getValidity().getStart())
-                || !instanceTime.before(processCluster.getValidity().getEnd())) {
-            throw new IllegalArgumentException("Instance time provided: " + instanceTime
-                    + " is not in validity range of process: " + process.getName()
-                    + "on cluster: " + cluster.getName());
-        }
-
-        // check instanceTime is valid on the basis of startTime and frequency
-        Date nextInstance = EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
-                process.getFrequency(), process.getTimezone(), instanceTime);
-        if (!nextInstance.equals(instanceTime)) {
-            throw new IllegalArgumentException("Instance time provided: " + instanceTime
-                    + " for process: " + process.getName() + " is not a valid instance time on cluster: "
-                    + cluster.getName() + " on the basis of startDate and frequency");
-        }
-    }
-
-    /**
-     * Given a process instance, returns the feed instances which are used as input for this process instance.
-     *
-     * @param process            given process
-     * @param instanceTime       nominal time of the process instance
-     * @param cluster            - cluster for the process instance
-     * @param allowOptionalFeeds switch to indicate whether optional feeds should be considered in input feeds.
-     * @return Set of input feed instances which are consumed by the given process instance.
-     * @throws org.apache.falcon.FalconException
-     */
-    public static Set<SchedulableEntityInstance> getInputFeedInstances(Process process, Date instanceTime,
-               org.apache.falcon.entity.v0.cluster.Cluster cluster, boolean allowOptionalFeeds) throws FalconException {
-
-        // validate the inputs
-        validateProcessInstance(process, instanceTime, cluster);
-
-        Set<SchedulableEntityInstance> result = new HashSet<>();
-        if (process.getInputs() != null) {
-            ConfigurationStore store = ConfigurationStore.get();
-            for (Input i : process.getInputs().getInputs()) {
-                if (i.isOptional() && !allowOptionalFeeds) {
-                    continue;
-                }
-                Feed feed = store.get(EntityType.FEED, i.getFeed());
-                // inputStart is process instance time + (now - startTime)
-                ExpressionHelper evaluator = ExpressionHelper.get();
-                ExpressionHelper.setReferenceDate(instanceTime);
-                Date inputInstanceStartDate = evaluator.evaluate(i.getStart(), Date.class);
-                Date inputInstanceEndDate = evaluator.evaluate(i.getEnd(), Date.class);
-                List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
-                        inputInstanceStartDate, inputInstanceEndDate);
-                SchedulableEntityInstance instance;
-                for (Date time : instanceTimes) {
-                    instance = new SchedulableEntityInstance(feed.getName(), cluster.getName(), time, EntityType.FEED);
-                    instance.setTags(SchedulableEntityInstance.INPUT);
-                    result.add(instance);
-                }
-            }
-        }
-        return result;
-    }
-
-    public static Set<SchedulableEntityInstance> getOutputFeedInstances(Process process, Date instanceTime,
-                                        org.apache.falcon.entity.v0.cluster.Cluster cluster) throws FalconException {
-        Set<SchedulableEntityInstance> result = new HashSet<>();
-
-        // validate the inputs
-        validateProcessInstance(process, instanceTime, cluster);
-
-        if (process.getOutputs() != null && process.getOutputs().getOutputs() != null) {
-
-            ExpressionHelper.setReferenceDate(instanceTime);
-            ExpressionHelper evaluator = ExpressionHelper.get();
-            SchedulableEntityInstance candidate;
-            ConfigurationStore store = ConfigurationStore.get();
-            for (Output output : process.getOutputs().getOutputs()) {
-
-                Date outputInstance = evaluator.evaluate(output.getInstance(), Date.class);
-                // find the feed
-                Feed feed = store.get(EntityType.FEED, output.getFeed());
-                org.apache.falcon.entity.v0.feed.Cluster fCluster = FeedHelper.getCluster(feed, cluster.getName());
-                outputInstance = EntityUtil.getPreviousInstanceTime(fCluster.getValidity().getStart(),
-                        feed.getFrequency(), feed.getTimezone(), outputInstance);
-                candidate = new SchedulableEntityInstance(output.getFeed(), cluster.getName(), outputInstance,
-                        EntityType.FEED);
-                candidate.setTags(SchedulableEntityInstance.OUTPUT);
-                result.add(candidate);
-            }
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/Storage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/Storage.java b/common/src/main/java/org/apache/falcon/entity/Storage.java
deleted file mode 100644
index 3dc8f67..0000000
--- a/common/src/main/java/org/apache/falcon/entity/Storage.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.Path;
-
-import java.util.Date;
-import java.util.List;
-
-/**
- * A class to encapsulate the storage for a given feed which can either be
- * expressed as a path on the file system or a table in a catalog.
- */
-public interface Storage extends Configurable {
-
-    String DOLLAR_EXPR_START_REGEX = "\\$\\{";
-    String QUESTION_EXPR_START_REGEX = "\\?\\{";
-    String EXPR_CLOSE_REGEX = "\\}";
-
-    /**
-     * URI Friendly expression.
-     */
-    String DOLLAR_EXPR_START_NORMALIZED = "_D__START_";
-    String EXPR_CLOSE_NORMALIZED = "_CLOSE_";
-
-    /**
-     * Enumeration for the various storage types.
-     */
-    enum TYPE {FILESYSTEM, TABLE}
-
-    /**
-     * Return the type of storage.
-     *
-     * @return storage type
-     */
-    TYPE getType();
-
-    /**
-     * Return the uri template.
-     *
-     * @return uri template
-     */
-    String getUriTemplate();
-
-    /**
-     * Return the uri template for a given location type.
-     *
-     * @param locationType type of location, applies only to filesystem type
-     * @return uri template
-     */
-    String getUriTemplate(LocationType locationType);
-
-    /**
-     * Check for equality of this instance against the one in question.
-     *
-     * @param toCompareAgainst instance to compare
-     * @return true if identical else false
-     * @throws FalconException an exception
-     */
-    boolean isIdentical(Storage toCompareAgainst) throws FalconException;
-
-    /**
-     * Check the permission on the storage, regarding owner/group/permission coming from ACL.
-     *
-     * @param acl the ACL defined in the entity.
-     * @throws FalconException if the permissions are not valid.
-     */
-    void validateACL(AccessControlList acl) throws FalconException;
-
-    /**
-     * Get Feed Listing for a feed between a date range.
-     */
-    List<FeedInstanceStatus> getListing(Feed feed, String cluster, LocationType locationType,
-                                        Date start, Date end) throws FalconException;
-
-
-    /**
-     * Checks the availability status for a given feed instance.
-     */
-    FeedInstanceStatus.AvailabilityStatus getInstanceAvailabilityStatus(Feed feed, String clusterName,
-                                                                        LocationType locationType,
-                                                                        Date instanceTime) throws FalconException;
-
-
-    /**
-     * Delete the instances of the feeds which are older than the retentionLimit specified.
-     *
-     * @param retentionLimit - retention limit of the feed e.g. hours(5).
-     * @param timeZone - timeZone for the feed definition.
-     * @param logFilePath - logFile to be used to record the deleted instances.
-     * @return - StringBuffer containing comma separated list of dates for the deleted instances.
-     * @throws FalconException
-     */
-    StringBuilder evict(String retentionLimit, String timeZone, Path logFilePath) throws FalconException;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java b/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
deleted file mode 100644
index c58be64..0000000
--- a/common/src/main/java/org/apache/falcon/entity/WorkflowNameBuilder.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.entity;
-
-import org.apache.falcon.Pair;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Convenient builder for workflow name.
- * @param <T>
- */
-public class WorkflowNameBuilder<T extends Entity> {
-    private static final String PREFIX = "FALCON";
-
-    private T entity;
-    private Tag tag;
-    private List<String> suffixes;
-
-    public WorkflowNameBuilder(T entity) {
-        this.entity = entity;
-    }
-
-    public void setTag(Tag tag) {
-        this.tag = tag;
-    }
-
-    public void setSuffixes(List<String> suffixes) {
-        this.suffixes = suffixes;
-    }
-
-    public WorkflowName getWorkflowName() {
-        return new WorkflowName(PREFIX, entity.getEntityType().name(),
-                tag == null ? null : tag.name(), entity.getName(),
-                suffixes == null ? new ArrayList<String>() : suffixes);
-    }
-
-    public Tag getWorkflowTag(String workflowName) {
-        return WorkflowName.getTagAndSuffixes(workflowName) == null ? null
-                : WorkflowName.getTagAndSuffixes(workflowName).first;
-    }
-
-    public String getWorkflowSuffixes(String workflowName) {
-        return WorkflowName.getTagAndSuffixes(workflowName) == null ? ""
-                : WorkflowName.getTagAndSuffixes(workflowName).second;
-    }
-
-    /**
-     * Workflow name.
-     */
-    public static class WorkflowName {
-        private static final String SEPARATOR = "_";
-        private static final Pattern WF_NAME_PATTERN;
-
-        private String prefix;
-        private String entityType;
-        private String tag;
-        private String entityName;
-        private List<String> suffixes;
-
-        static {
-            StringBuilder typePattern = new StringBuilder("(");
-            for (EntityType type : EntityType.values()) {
-                typePattern.append(type.name());
-                typePattern.append("|");
-            }
-            typePattern = typePattern.deleteCharAt(typePattern.length() - 1);
-            typePattern.append(")");
-            StringBuilder tagsPattern = new StringBuilder("(");
-            for (Tag tag : Tag.values()) {
-                tagsPattern.append(tag.name());
-                tagsPattern.append("|");
-            }
-            tagsPattern = tagsPattern.deleteCharAt(tagsPattern.length() - 1);
-            tagsPattern.append(")");
-
-            String name = "([a-zA-Z][\\-a-zA-Z0-9]*)";
-
-            String suffix = "([_A-Za-z0-9-.]*)";
-
-            String namePattern = PREFIX + SEPARATOR + typePattern + SEPARATOR + tagsPattern
-                    + SEPARATOR + name + suffix;
-
-            WF_NAME_PATTERN = Pattern.compile(namePattern);
-        }
-
-        public WorkflowName(String prefix, String entityType, String tag,
-                            String entityName, List<String> suffixes) {
-            this.prefix = prefix;
-            this.entityType = entityType;
-            this.tag = tag;
-            this.entityName = entityName;
-            this.suffixes = suffixes;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append(prefix).append(SEPARATOR).append(entityType)
-                    .append(tag == null ? "" : SEPARATOR + tag)
-                    .append(SEPARATOR).append(entityName);
-
-            for (String suffix : suffixes) {
-                builder.append(SEPARATOR).append(suffix);
-            }
-
-            return builder.toString();
-        }
-
-        public static Pair<Tag, String> getTagAndSuffixes(String workflowName) {
-            Matcher matcher = WF_NAME_PATTERN.matcher(workflowName);
-            if (matcher.matches()) {
-                matcher.reset();
-                if (matcher.find()) {
-                    String tag = matcher.group(2);
-                    String suffixes = matcher.group(4);
-                    return new Pair<>(Tag.valueOf(tag), suffixes);
-                }
-            }
-            return null;
-        }
-
-        public static Pair<String, EntityType> getEntityNameAndType(String workflowName) {
-            Matcher matcher = WF_NAME_PATTERN.matcher(workflowName);
-            if (matcher.matches()) {
-                matcher.reset();
-                if (matcher.find()) {
-                    String type = matcher.group(1);
-                    String name = matcher.group(3);
-                    return new Pair<>(name, EntityType.valueOf(type));
-                }
-            }
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
deleted file mode 100644
index 51568fb..0000000
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.entity.common;
-
-import java.util.Calendar;
-import java.util.regex.Pattern;
-
-/**
- * Helper to map feed path and the time component.
- */
-public final class FeedDataPath {
-
-    private FeedDataPath() {}
-
-    /**
-     * Standard variables for feed time components.
-     */
-    public enum VARS {
-        YEAR("([0-9]{4})", Calendar.YEAR, 4),
-        MONTH("(0[1-9]|1[0-2])", Calendar.MONTH, 2),
-        DAY("(0[1-9]|1[0-9]|2[0-9]|3[0-1])", Calendar.DAY_OF_MONTH, 2),
-        HOUR("([0-1][0-9]|2[0-4])", Calendar.HOUR_OF_DAY, 2),
-        MINUTE("([0-5][0-9]|60)", Calendar.MINUTE, 2);
-
-        private final Pattern pattern;
-        private final String valuePattern;
-        private final int calendarField;
-        private final int valueSize;
-
-        private VARS(String patternRegularExpression, int calField, int numDigits) {
-            pattern = Pattern.compile("\\$\\{" + name() + "\\}");
-            this.valuePattern = patternRegularExpression;
-            this.calendarField = calField;
-            this.valueSize = numDigits;
-        }
-
-        public String getValuePattern() {
-            return valuePattern;
-        }
-
-        public String regex() {
-            return pattern.pattern();
-        }
-
-        public int getCalendarField() {
-            return calendarField;
-        }
-
-        public int getValueSize() {
-            return valueSize;
-        }
-
-        public void setCalendar(Calendar cal, int value) {
-            if (this == MONTH) {
-                cal.set(calendarField, value - 1);
-            } else {
-                cal.set(calendarField, value);
-            }
-        }
-
-        public static VARS from(String str) {
-            for (VARS var : VARS.values()) {
-                if (var.pattern.matcher(str).matches()) {
-                    return var;
-                }
-            }
-            return null;
-        }
-    }
-
-    public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex()
-            + "|" + VARS.MONTH.regex() + "|" + VARS.DAY.regex() + "|"
-            + VARS.HOUR.regex() + "|" + VARS.MINUTE.regex());
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java b/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java
deleted file mode 100644
index 0cf2722..0000000
--- a/common/src/main/java/org/apache/falcon/entity/lock/MemoryLocks.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.entity.lock;
-
-import org.apache.falcon.entity.v0.Entity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * In memory resource locking that provides lock capabilities.
- */
-public final class MemoryLocks {
-    private static final Logger LOG = LoggerFactory.getLogger(MemoryLocks.class);
-    private static ConcurrentHashMap<String, Boolean> locks = new ConcurrentHashMap<String, Boolean>();
-
-    private static MemoryLocks instance = new MemoryLocks();
-
-    private MemoryLocks() {
-    }
-
-    public static MemoryLocks getInstance() {
-        return instance;
-    }
-
-    /**
-     * Obtain a lock for an entity.
-     *
-     * @param entity entity object.
-     * @return the lock token for the resource, or <code>null</code> if the lock could not be obtained.
-     */
-    public boolean acquireLock(Entity entity, String command) {
-        boolean lockObtained = false;
-        String entityName = getLockKey(entity);
-
-        Boolean putResponse = locks.putIfAbsent(entityName, true);
-        if (putResponse == null || !putResponse) {
-            LOG.info("Lock acquired for {} on {} by {}",
-                    command, entity.toShortString(), Thread.currentThread().getName());
-            lockObtained = true;
-        }
-        return lockObtained;
-    }
-
-    /**
-     * Release the lock for an entity.
-     *
-     * @param entity entity object.
-     */
-    public void releaseLock(Entity entity) {
-        String entityName = getLockKey(entity);
-
-        locks.remove(entityName);
-        LOG.info("Successfully released lock on {} by {}",
-                entity.toShortString(), Thread.currentThread().getName());
-    }
-
-    private String getLockKey(Entity entity) {
-        return entity.getEntityType().toString() + "." + entity.getName();
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
deleted file mode 100644
index bef4b39..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.commons.lang.Validate;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.catalog.CatalogServiceFactory;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.ACL;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
-import org.apache.falcon.entity.v0.cluster.Interface;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.cluster.Location;
-import org.apache.falcon.entity.v0.cluster.Properties;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.WorkflowEngineFactory;
-import org.apache.falcon.workflow.util.OozieConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.ConnectionFactory;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.List;
-
-/**
- * Parser that parses cluster entity definition.
- */
-public class ClusterEntityParser extends EntityParser<Cluster> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ClusterEntityParser.class);
-
-    public ClusterEntityParser() {
-        super(EntityType.CLUSTER);
-    }
-
-    @Override
-    public void validate(Cluster cluster) throws ValidationException {
-        // validating scheme in light of fail-early
-        validateScheme(cluster, Interfacetype.READONLY);
-        validateScheme(cluster, Interfacetype.WRITE);
-        validateScheme(cluster, Interfacetype.WORKFLOW);
-        // User may choose to disable job completion notifications
-        if (ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING) != null) {
-            validateScheme(cluster, Interfacetype.MESSAGING);
-        }
-        if (CatalogServiceFactory.isEnabled()
-                && ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY) != null) {
-            validateScheme(cluster, Interfacetype.REGISTRY);
-        }
-
-        validateACL(cluster);
-
-        if (!EntityUtil.responsibleFor(cluster.getColo())) {
-            return;
-        }
-
-        validateReadInterface(cluster);
-        validateWriteInterface(cluster);
-        validateExecuteInterface(cluster);
-        validateWorkflowInterface(cluster);
-        validateMessagingInterface(cluster);
-        validateRegistryInterface(cluster);
-        validateLocations(cluster);
-        validateProperties(cluster);
-    }
-
-    private void validateScheme(Cluster cluster, Interfacetype interfacetype)
-        throws ValidationException {
-        final String endpoint = ClusterHelper.getInterface(cluster, interfacetype).getEndpoint();
-        URI uri = new Path(endpoint).toUri();
-        if (uri.getScheme() == null) {
-            if (Interfacetype.WORKFLOW == interfacetype
-                    && uri.toString().equals(OozieConstants.LOCAL_OOZIE)) {
-                return;
-            }
-            throw new ValidationException("Cannot get valid scheme for interface: "
-                    + interfacetype + " of cluster: " + cluster.getName());
-        }
-    }
-
-    private void validateReadInterface(Cluster cluster) throws ValidationException {
-        final String readOnlyStorageUrl = ClusterHelper.getReadOnlyStorageUrl(cluster);
-        LOG.info("Validating read interface: {}", readOnlyStorageUrl);
-
-        validateFileSystem(cluster, readOnlyStorageUrl);
-    }
-
-    private void validateWriteInterface(Cluster cluster) throws ValidationException {
-        final String writeStorageUrl = ClusterHelper.getStorageUrl(cluster);
-        LOG.info("Validating write interface: {}", writeStorageUrl);
-
-        validateFileSystem(cluster, writeStorageUrl);
-    }
-
-    private void validateFileSystem(Cluster cluster, String storageUrl) throws ValidationException {
-        try {
-            Configuration conf = new Configuration();
-            conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
-            conf.setInt("ipc.client.connect.max.retries", 10);
-
-            if (UserGroupInformation.isSecurityEnabled()) {
-                String nameNodePrincipal = ClusterHelper.getPropertyValue(cluster, SecurityUtil.NN_PRINCIPAL);
-                Validate.notEmpty(nameNodePrincipal,
-                    "Cluster definition missing required namenode credential property: " + SecurityUtil.NN_PRINCIPAL);
-
-                conf.set(SecurityUtil.NN_PRINCIPAL, nameNodePrincipal);
-            }
-
-            FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
-            fs.exists(new Path("/"));
-        } catch (Exception e) {
-            throw new ValidationException("Invalid storage server or port: " + storageUrl
-                    + ", " + e.getMessage(), e);
-        }
-    }
-
-    private void validateExecuteInterface(Cluster cluster) throws ValidationException {
-        String executeUrl = ClusterHelper.getMREndPoint(cluster);
-        LOG.info("Validating execute interface: {}", executeUrl);
-
-        try {
-            HadoopClientFactory.get().validateJobClient(executeUrl);
-        } catch (IOException e) {
-            throw new ValidationException("Invalid Execute server or port: " + executeUrl, e);
-        }
-    }
-
-    protected void validateWorkflowInterface(Cluster cluster) throws ValidationException {
-        final String workflowUrl = ClusterHelper.getOozieUrl(cluster);
-        LOG.info("Validating workflow interface: {}", workflowUrl);
-        if (OozieConstants.LOCAL_OOZIE.equals(workflowUrl)) {
-            return;
-        }
-        try {
-            if (!WorkflowEngineFactory.getWorkflowEngine().isAlive(cluster)) {
-                throw new ValidationException("Unable to reach Workflow server:" + workflowUrl);
-            }
-        } catch (FalconException e) {
-            throw new ValidationException("Invalid Workflow server or port: " + workflowUrl, e);
-        }
-    }
-
-    protected void validateMessagingInterface(Cluster cluster) throws ValidationException {
-        // Validate only if user has specified this
-        final Interface messagingInterface = ClusterHelper.getInterface(cluster, Interfacetype.MESSAGING);
-        if (messagingInterface == null) {
-            LOG.info("Messaging service is not enabled for cluster: {}", cluster.getName());
-            return;
-        }
-
-        final String messagingUrl = ClusterHelper.getMessageBrokerUrl(cluster);
-        final String implementation = StartupProperties.get().getProperty("broker.impl.class",
-                "org.apache.activemq.ActiveMQConnectionFactory");
-        LOG.info("Validating messaging interface: {}, implementation: {}", messagingUrl, implementation);
-
-        try {
-            @SuppressWarnings("unchecked")
-            Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)
-                    getClass().getClassLoader().loadClass(implementation);
-            ConnectionFactory connectionFactory = clazz.getConstructor(
-                    String.class, String.class, String.class).newInstance("", "", messagingUrl);
-            connectionFactory.createConnection();
-        } catch (Exception e) {
-            throw new ValidationException("Invalid Messaging server or port: " + messagingUrl
-                    + " for: " + implementation, e);
-        }
-    }
-
-    protected void validateRegistryInterface(Cluster cluster) throws ValidationException {
-        final boolean isCatalogRegistryEnabled = CatalogServiceFactory.isEnabled();
-        if (!isCatalogRegistryEnabled) {
-            return;  // ignore the registry interface for backwards compatibility
-        }
-
-        // continue validation only if a catalog service is provided
-        final Interface catalogInterface = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY);
-        if (catalogInterface == null) {
-            LOG.info("Catalog service is not enabled for cluster: {}", cluster.getName());
-            return;
-        }
-
-        final String catalogUrl = catalogInterface.getEndpoint();
-        LOG.info("Validating catalog registry interface: {}", catalogUrl);
-
-        try {
-            Configuration clusterConf = ClusterHelper.getConfiguration(cluster);
-            if (UserGroupInformation.isSecurityEnabled()) {
-                String metaStorePrincipal = clusterConf.get(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
-                Validate.notEmpty(metaStorePrincipal,
-                        "Cluster definition missing required metastore credential property: "
-                                + SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
-            }
-
-            if (!CatalogServiceFactory.getCatalogService().isAlive(clusterConf, catalogUrl)) {
-                throw new ValidationException("Unable to reach Catalog server:" + catalogUrl);
-            }
-        } catch (FalconException e) {
-            throw new ValidationException("Invalid Catalog server or port: " + catalogUrl, e);
-        }
-    }
-
-    /**
-     * Validate ACL if authorization is enabled.
-     *
-     * @param cluster cluster entity
-     * @throws ValidationException
-     */
-    private void validateACL(Cluster cluster) throws ValidationException {
-        if (isAuthorizationDisabled) {
-            return;
-        }
-
-        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
-        final ACL clusterACL = cluster.getACL();
-        if (clusterACL == null) {
-            throw new ValidationException("Cluster ACL cannot be empty for:  " + cluster.getName());
-        }
-
-        validateACLOwnerAndGroup(clusterACL);
-
-        try {
-            authorize(cluster.getName(), clusterACL);
-        } catch (AuthorizationException e) {
-            throw new ValidationException(e);
-        }
-    }
-
-    /**
-     * Validate the locations on the cluster exists with appropriate permissions
-     * for the user to write to this directory.
-     *
-     * @param cluster cluster entity
-     * @throws ValidationException
-     */
-    protected void validateLocations(Cluster cluster) throws ValidationException {
-        Configuration conf = ClusterHelper.getConfiguration(cluster);
-        FileSystem fs;
-        try {
-            fs = HadoopClientFactory.get().createFalconFileSystem(conf);
-        } catch (FalconException e) {
-            throw new ValidationException("Unable to get file system handle for cluster " + cluster.getName(), e);
-        }
-
-        Location stagingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.STAGING);
-        if (stagingLocation == null) {
-            throw new ValidationException(
-                    "Unable to find the mandatory location of name: " + ClusterLocationType.STAGING.value()
-                            + " for cluster " + cluster.getName());
-        } else {
-            checkPathOwnerAndPermission(cluster.getName(), stagingLocation.getPath(), fs,
-                    HadoopClientFactory.ALL_PERMISSION);
-            if (!ClusterHelper.checkWorkingLocationExists(cluster)) {
-                //Creating location type of working in the sub dir of staging dir with perms 755. FALCON-910
-                createWorkingDirUnderStaging(fs, cluster, stagingLocation);
-            } else {
-                Location workingLocation = ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING);
-                if (stagingLocation.getPath().equals(workingLocation.getPath())) {
-                    throw new ValidationException(
-                            "Location with name: " + stagingLocation.getName().value() + " and " + workingLocation
-                                    .getName().value() + " cannot have same path: " + stagingLocation.getPath()
-                                    + " for cluster :" + cluster.getName());
-                } else {
-                    checkPathOwnerAndPermission(cluster.getName(), workingLocation.getPath(), fs,
-                            HadoopClientFactory.READ_EXECUTE_PERMISSION);
-                }
-            }
-            // Create staging subdirs falcon/workflows/feed and falcon/workflows/process : Falcon-1647
-            createStagingSubdirs(fs, cluster, stagingLocation,
-                    "falcon/workflows/feed", HadoopClientFactory.ALL_PERMISSION);
-            createStagingSubdirs(fs, cluster, stagingLocation,
-                    "falcon/workflows/process", HadoopClientFactory.ALL_PERMISSION);
-        }
-    }
-
-    private void createWorkingDirUnderStaging(FileSystem fs, Cluster cluster,
-                                              Location stagingLocation) throws ValidationException {
-        Path workingDirPath = new Path(stagingLocation.getPath(), ClusterHelper.WORKINGDIR);
-        try {
-            if (!fs.exists(workingDirPath)) {  //Checking if the staging dir has the working dir to be created
-                HadoopClientFactory.mkdirs(fs, workingDirPath, HadoopClientFactory.READ_EXECUTE_PERMISSION);
-            } else {
-                if (fs.isDirectory(workingDirPath)) {
-                    FsPermission workingPerms = fs.getFileStatus(workingDirPath).getPermission();
-                    if (!workingPerms.equals(HadoopClientFactory.READ_EXECUTE_PERMISSION)) { //perms check
-                        throw new ValidationException(
-                                "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:"
-                                        + stagingLocation.getPath()
-                                        + " when staging location not specified with "
-                                        + HadoopClientFactory.READ_EXECUTE_PERMISSION.toString() + " got "
-                                        + workingPerms.toString());
-                    }
-                } else {
-                    throw new ValidationException(
-                            "Falcon needs subdir " + ClusterHelper.WORKINGDIR + " inside staging dir:"
-                                    + stagingLocation.getPath()
-                                    + " when staging location not specified. Got a file at " + workingDirPath
-                                    .toString());
-                }
-            }
-        } catch (IOException e) {
-            throw new ValidationException(
-                    "Unable to create path for " + workingDirPath.toString() + " with path: "
-                            + workingDirPath.toString() + " for cluster " + cluster.getName(), e);
-        }
-    }
-
-    private void createStagingSubdirs(FileSystem fs, Cluster cluster, Location stagingLocation,
-                                      String path, FsPermission permission) throws ValidationException {
-        Path subdirPath = new Path(stagingLocation.getPath(), path);
-        try {
-            HadoopClientFactory.mkdirs(fs, subdirPath, permission);
-        } catch (IOException e) {
-            throw new ValidationException(
-                    "Unable to create path "
-                            + subdirPath.toString() + " for cluster " + cluster.getName(), e);
-        }
-    }
-
-    protected void validateProperties(Cluster cluster) throws ValidationException {
-        Properties properties = cluster.getProperties();
-        if (properties == null) {
-            return; // Cluster has no properties to validate.
-        }
-
-        List<Property> propertyList = cluster.getProperties().getProperties();
-        HashSet<String> propertyKeys = new HashSet<String>();
-        for (Property prop : propertyList) {
-            if (StringUtils.isBlank(prop.getName())) {
-                throw new ValidationException("Property name and value cannot be empty for Cluster: "
-                        + cluster.getName());
-            }
-            if (!propertyKeys.add(prop.getName())) {
-                throw new ValidationException("Multiple properties with same name found for Cluster: "
-                        + cluster.getName());
-            }
-        }
-    }
-
-    private void checkPathOwnerAndPermission(String clusterName, String location, FileSystem fs,
-            FsPermission expectedPermission) throws ValidationException {
-
-        Path locationPath = new Path(location);
-        try {
-            if (!fs.exists(locationPath)) {
-                throw new ValidationException("Location " + location + " for cluster " + clusterName + " must exist.");
-            }
-
-            // falcon owns this path on each cluster
-            final String loginUser = UserGroupInformation.getLoginUser().getShortUserName();
-            FileStatus fileStatus = fs.getFileStatus(locationPath);
-            final String locationOwner = fileStatus.getOwner();
-            if (!locationOwner.equals(loginUser)) {
-                LOG.error("Owner of the location {} is {} for cluster {}. Current user {} is not the owner of the "
-                        + "location.", locationPath, locationOwner, clusterName, loginUser);
-                throw new ValidationException("Path [" + locationPath + "] on the cluster [" + clusterName + "] has "
-                        + "owner [" + locationOwner + "]. Current user [" + loginUser + "] is not the owner of the "
-                        + "path");
-            }
-            String errorMessage = "Path " + locationPath + " has permissions: " + fileStatus.getPermission().toString()
-                    + ", should be " + expectedPermission;
-            if (fileStatus.getPermission().toShort() != expectedPermission.toShort()) {
-                LOG.error(errorMessage);
-                throw new ValidationException(errorMessage);
-            }
-            // try to list to see if the user is able to write to this folder
-            fs.listStatus(locationPath);
-        } catch (IOException e) {
-            throw new ValidationException(
-                    "Unable to validate the location with path: " + location + " for cluster:" + clusterName
-                            + " due to transient failures ", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java b/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java
deleted file mode 100644
index 18ae754..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/CrossEntityValidations.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Validity;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.util.DateUtil;
-
-import java.util.Date;
-
-/**
- * Validation helper functions to validate across process, feed and cluster definitions.
- */
-public final class CrossEntityValidations {
-
-    private CrossEntityValidations() {}
-
-    public static void validateInstanceRange(Process process, Input input, Feed feed) throws FalconException {
-
-        try {
-            for (Cluster cluster : process.getClusters().getClusters()) {
-                String clusterName = cluster.getName();
-                org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed,
-                        clusterName).getValidity();
-
-                // Optinal end_date
-                if (feedValidity.getEnd() == null) {
-                    feedValidity.setEnd(DateUtil.NEVER);
-                }
-
-                Date feedStart = feedValidity.getStart();
-                Date feedEnd = feedValidity.getEnd();
-
-                String instStartEL = input.getStart();
-                String instEndEL = input.getEnd();
-                ExpressionHelper evaluator = ExpressionHelper.get();
-
-                Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity();
-                ExpressionHelper.setReferenceDate(processValidity.getStart());
-                Date instStart = evaluator.evaluate(instStartEL, Date.class);
-                Date instEnd = evaluator.evaluate(instEndEL, Date.class);
-                if (instStart.before(feedStart)) {
-                    throw new ValidationException("Start instance  " + instStartEL + " of feed " + feed.getName()
-                            + " is before the start of feed " + feedValidity.getStart() + " for cluster "
-                            + clusterName);
-                }
-
-                if (instEnd.before(instStart)) {
-                    throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName()
-                            + " is before the start instance " + instStartEL + " for cluster " + clusterName);
-                }
-
-                if (instEnd.after(feedEnd)) {
-                    throw new ValidationException("End instance " + instEndEL + " for feed " + feed.getName()
-                            + " is after the end of feed " + feedValidity.getEnd() + " for cluster " + clusterName);
-                }
-            }
-        } catch (ValidationException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
-    public static void validateFeedRetentionPeriod(String startInstance, Feed feed, String clusterName)
-        throws FalconException {
-
-        String feedRetention = FeedHelper.getCluster(feed, clusterName).getRetention().getLimit().toString();
-        ExpressionHelper evaluator = ExpressionHelper.get();
-
-        Date now = new Date();
-        ExpressionHelper.setReferenceDate(now);
-        Date instStart = evaluator.evaluate(startInstance, Date.class);
-        long feedDuration = evaluator.evaluate(feedRetention, Long.class);
-        Date feedStart = new Date(now.getTime() - feedDuration);
-
-        if (instStart.before(feedStart)) {
-            throw new ValidationException("StartInstance :" + startInstance + " of process is out of range for Feed: "
-                    + feed.getName() + "  in cluster: " + clusterName + "'s retention limit :" + feedRetention);
-        }
-    }
-
-    // Mapping to oozie coord's dataset fields
-    public static void validateInstance(Process process, Output output, Feed feed) throws FalconException {
-
-        try {
-            for (Cluster cluster : process.getClusters().getClusters()) {
-                String clusterName = cluster.getName();
-                org.apache.falcon.entity.v0.feed.Validity feedValidity = FeedHelper.getCluster(feed,
-                        clusterName).getValidity();
-                Date feedStart = feedValidity.getStart();
-                Date feedEnd = feedValidity.getEnd();
-
-                String instEL = output.getInstance();
-                ExpressionHelper evaluator = ExpressionHelper.get();
-                Validity processValidity = ProcessHelper.getCluster(process, clusterName).getValidity();
-                ExpressionHelper.setReferenceDate(processValidity.getStart());
-                Date inst = evaluator.evaluate(instEL, Date.class);
-                if (inst.before(feedStart)) {
-                    throw new ValidationException("Instance  " + instEL + " of feed " + feed.getName()
-                            + " is before the start of feed " + feedValidity.getStart() + " for cluster" + clusterName);
-                }
-
-                if (inst.after(feedEnd)) {
-                    throw new ValidationException("End instance " + instEL + " for feed " + feed.getName()
-                            + " is after the end of feed " + feedValidity.getEnd() + " for cluster" + clusterName);
-                }
-            }
-        } catch (ValidationException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
-    public static void validateInputPartition(Input input, Feed feed) throws ValidationException {
-        String[] parts = input.getPartition().split("/");
-        if (feed.getPartitions() == null || feed.getPartitions().getPartitions().isEmpty()
-                || feed.getPartitions().getPartitions().size() < parts.length) {
-            throw new ValidationException("Partition specification in input " + input.getName() + " is wrong");
-        }
-    }
-
-    public static void validateFeedDefinedForCluster(Feed feed, String clusterName) throws FalconException {
-        if (FeedHelper.getCluster(feed, clusterName) == null) {
-            throw new ValidationException("Feed " + feed.getName() + " is not defined for cluster " + clusterName);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
deleted file mode 100644
index 998f952..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/DatasourceEntityParser.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.DatasourceHelper;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.datasource.ACL;
-import org.apache.falcon.entity.v0.datasource.Datasource;
-import org.apache.falcon.entity.v0.datasource.Interfacetype;
-import org.apache.falcon.util.HdfsClassLoader;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.util.Arrays;
-import java.util.Properties;
-
-/**
- * Parser for DataSource entity definition.
- */
-
-public class DatasourceEntityParser extends EntityParser<Datasource> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DatasourceEntityParser.class);
-
-    public DatasourceEntityParser() {
-        super(EntityType.DATASOURCE);
-    }
-
-    @Override
-    public void validate(Datasource db) throws FalconException {
-        try {
-            ClassLoader hdfsClassLoader = HdfsClassLoader.load(db.getName(), db.getDriver().getJars());
-            validateInterface(db, Interfacetype.READONLY, hdfsClassLoader);
-            validateInterface(db, Interfacetype.WRITE, hdfsClassLoader);
-            validateACL(db);
-        } catch(IOException io) {
-            throw new ValidationException("Unable to copy driver jars to local dir: "
-                    + Arrays.toString(db.getDriver().getJars().toArray()));
-        }
-    }
-
-    private static void validateInterface(Datasource db, Interfacetype interfacetype, ClassLoader hdfsClassLoader)
-        throws ValidationException {
-        String endpoint = null;
-        Properties userPasswdInfo = null;
-        try {
-            if (interfacetype == Interfacetype.READONLY) {
-                endpoint = DatasourceHelper.getReadOnlyEndpoint(db);
-                userPasswdInfo = DatasourceHelper.fetchReadPasswordInfo(db);
-            } else if (interfacetype == Interfacetype.WRITE) {
-                endpoint = DatasourceHelper.getWriteEndpoint(db);
-                userPasswdInfo = DatasourceHelper.fetchWritePasswordInfo(db);
-            }
-            if (StringUtils.isNotBlank(endpoint)) {
-                LOG.info("Validating {} endpoint {} connection.", interfacetype.value(), endpoint);
-                validateConnection(hdfsClassLoader, db.getDriver().getClazz(), endpoint, userPasswdInfo);
-            }
-        } catch(FalconException fe) {
-            throw new ValidationException(String.format("Cannot validate '%s' "
-                            + "interface '%s' " + "of database entity '%s' due to '%s' ",
-                   interfacetype, endpoint,
-                   db.getName(), fe.getMessage()));
-        }
-    }
-
-    private static void validateConnection(ClassLoader hdfsClassLoader, String driverClass,
-                                    String connectUrl, Properties userPasswdInfo)
-        throws FalconException {
-        ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
-        LOG.info("Preserving current classloader: {}", previousClassLoader.toString());
-        try {
-            Thread.currentThread().setContextClassLoader(hdfsClassLoader);
-            LOG.info("Setting context classloader to : {}", hdfsClassLoader.toString());
-            java.sql.Driver driver = (java.sql.Driver) hdfsClassLoader.loadClass(driverClass).newInstance();
-            LOG.info("Validating connection URL: {} using driver: {}", connectUrl, driver.getClass().toString());
-            Connection con = driver.connect(connectUrl, userPasswdInfo);
-            if (con == null) {
-                throw new FalconException("DriverManager.getConnection() return "
-                       + "null for URL : " + connectUrl);
-            }
-        } catch (Exception ex) {
-            LOG.error("Exception while validating connection : ", ex);
-            throw new FalconException(ex);
-        } finally {
-            Thread.currentThread().setContextClassLoader(previousClassLoader);
-            LOG.info("Restoring original classloader {}", previousClassLoader.toString());
-        }
-    }
-
-    /**
-     * Validate ACL if authorization is enabled.
-     *
-     * @param  db database entity
-     * @throws ValidationException
-     */
-    private void validateACL(Datasource db) throws ValidationException {
-        if (isAuthorizationDisabled) {
-            return;
-        }
-
-        // Validate the entity owner is logged-in, authenticated user if authorization is enabled
-        final ACL dbACL = db.getACL();
-        if (dbACL == null) {
-            throw new ValidationException("Datasource ACL cannot be empty for:  " + db.getName());
-        }
-
-        validateACLOwnerAndGroup(dbACL);
-
-        try {
-            authorize(db.getName(), dbACL);
-        } catch (AuthorizationException e) {
-            throw new ValidationException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
deleted file mode 100644
index 05b204d..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParser.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.AccessControlList;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.Unmarshaller;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Generic Abstract Entity Parser, the concrete FEED, PROCESS and CLUSTER should extend this parser
- * to implement specific parsing.
- *
- * @param <T> of type Entity
- */
-public abstract class EntityParser<T extends Entity> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EntityParser.class);
-
-    private final EntityType entityType;
-    protected final boolean isAuthorizationDisabled;
-
-    protected EntityParser(EntityType entityType) {
-        this.entityType = entityType;
-        isAuthorizationDisabled = !SecurityUtil.isAuthorizationEnabled();
-    }
-
-    public EntityType getEntityType() {
-        return this.entityType;
-    }
-
-    /**
-     * Parses a sent XML and validates it using JAXB.
-     *
-     * @param xmlString - Entity XML
-     * @return Entity - JAVA Object
-     * @throws FalconException
-     */
-    public Entity parseAndValidate(String xmlString) throws FalconException {
-        InputStream inputStream = null;
-        try {
-            inputStream = new ByteArrayInputStream(xmlString.getBytes());
-            return parseAndValidate(inputStream);
-        } finally {
-            IOUtils.closeQuietly(inputStream);
-        }
-    }
-
-    /**
-     * Parses xml stream.
-     *
-     * @param xmlStream stream
-     * @return entity
-     * @throws FalconException
-     */
-    @SuppressWarnings("unchecked")
-    public T parse(InputStream xmlStream) throws FalconException {
-        try {
-            // parse against schema
-            Unmarshaller unmarshaller = entityType.getUnmarshaller();
-            T entity = (T) unmarshaller.unmarshal(xmlStream);
-            LOG.info("Parsed Entity: {}", entity.getName());
-            return entity;
-        } catch (Exception e) {
-            throw new FalconException(e);
-        }
-    }
-
-    public T parseAndValidate(InputStream xmlStream) throws FalconException {
-        T entity = parse(xmlStream);
-        validate(entity);
-        return entity;
-    }
-
-    protected void validateEntityExists(EntityType type, String name) throws FalconException {
-        if (ConfigurationStore.get().get(type, name) == null) {
-            throw new ValidationException("Referenced " + type + " " + name + " is not registered");
-        }
-    }
-
-    public abstract void validate(T entity) throws FalconException;
-
-    /**
-     * Checks if the acl owner is a valid user by fetching the groups for the owner.
-     * Also checks if the acl group is one of the fetched groups for membership.
-     * The only limitation is that a user cannot add a group in ACL that he does not belong to.
-     *
-     * @param acl  entity ACL
-     * @throws org.apache.falcon.entity.parser.ValidationException
-     */
-    protected void validateACLOwnerAndGroup(AccessControlList acl) throws ValidationException {
-        String aclOwner = acl.getOwner();
-        String aclGroup = acl.getGroup();
-
-        try {
-            UserGroupInformation proxyACLUser = UserGroupInformation.createProxyUser(
-                    aclOwner, UserGroupInformation.getLoginUser());
-            Set<String> groups = new HashSet<String>(Arrays.asList(proxyACLUser.getGroupNames()));
-            if (!groups.contains(aclGroup)) {
-                throw new AuthorizationException("Invalid group: " + aclGroup
-                        + " for user: " + aclOwner);
-            }
-        } catch (IOException e) {
-            throw new ValidationException("Invalid acl owner " + aclOwner
-                    + ", does not exist or does not belong to group: " + aclGroup);
-        }
-    }
-
-    /**
-     * Validate if the entity owner is the logged-in authenticated user.
-     *
-     * @param entityName  entity name
-     * @param acl         entity ACL
-     * @throws AuthorizationException
-     */
-    protected void authorize(String entityName,
-                             AccessControlList acl) throws AuthorizationException {
-        try {
-            SecurityUtil.getAuthorizationProvider().authorizeEntity(entityName,
-                    getEntityType().name(), acl, "submit", CurrentUser.getAuthenticatedUGI());
-        } catch (FalconException e) {
-            throw new AuthorizationException(e);
-        } catch (IOException e) {
-            throw new AuthorizationException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java b/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
deleted file mode 100644
index b497770..0000000
--- a/common/src/main/java/org/apache/falcon/entity/parser/EntityParserFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.parser;
-
-import org.apache.falcon.entity.v0.EntityType;
-
-/**
- * Factory Class which returns the Parser based on the EntityType.
- */
-public final class EntityParserFactory {
-
-    private EntityParserFactory() {
-    }
-
-    /**
-     * Tie EnityType with the Entity Class in one place so that it can be
-     * unmarshalled easily by concrete classes based on the class type using
-     * JAXB.
-     *
-     * @param entityType - entity type
-     * @return concrete parser based on entity type
-     */
-    public static EntityParser getParser(final EntityType entityType) {
-
-        switch (entityType) {
-        case PROCESS:
-            return new ProcessEntityParser();
-        case FEED:
-            return new FeedEntityParser();
-        case CLUSTER:
-            return new ClusterEntityParser();
-        case DATASOURCE:
-            return new DatasourceEntityParser();
-        default:
-            throw new IllegalArgumentException("Unhandled entity type: " + entityType);
-        }
-    }
-
-}