You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:26:24 UTC

[38/51] [partial] falcon git commit: FALCON-1830 Removed code source directories and updated pom

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
deleted file mode 100644
index e4d9385..0000000
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.v0;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Cluster;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.service.ConfigurationChangeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * An in-memory graph of entities and relationship among themselves.
- */
-public final class EntityGraph implements ConfigurationChangeListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EntityGraph.class);
-
-    private static EntityGraph instance = new EntityGraph();
-
-    private Map<Node, Set<Node>> graph = new ConcurrentHashMap<Node, Set<Node>>();
-
-    private EntityGraph() {
-    }
-
-    public static EntityGraph get() {
-        return instance;
-    }
-
-    public Set<Entity> getDependents(Entity entity) throws FalconException {
-        Node entityNode = new Node(entity.getEntityType(), entity.getName());
-        if (graph.containsKey(entityNode)) {
-            ConfigurationStore store = ConfigurationStore.get();
-            Set<Entity> dependents = new HashSet<Entity>();
-            for (Node node : graph.get(entityNode)) {
-                Entity dependentEntity = store.get(node.type, node.name);
-                if (dependentEntity != null) {
-                    dependents.add(dependentEntity);
-                } else {
-                    LOG.error("Dependent entity {} was not found in configuration store.", node);
-                }
-            }
-            return dependents;
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public void onAdd(Entity entity) throws FalconException {
-        Map<Node, Set<Node>> nodeEdges = null;
-        switch (entity.getEntityType()) {
-        case PROCESS:
-            nodeEdges = getEdgesFor((Process) entity);
-            break;
-        case FEED:
-            nodeEdges = getEdgesFor((Feed) entity);
-            break;
-        default:
-        }
-        if (nodeEdges == null) {
-            return;
-        }
-        LOG.debug("Adding edges for {}: {}", entity.getName(), nodeEdges);
-
-        for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) {
-            LOG.debug("Adding edges : {} for {}", entry.getValue(), entry.getKey());
-            if (graph.containsKey(entry.getKey())) {
-                graph.get(entry.getKey()).addAll(entry.getValue());
-            } else {
-                graph.put(entry.getKey(), entry.getValue());
-            }
-        }
-        LOG.debug("Merged edges to graph {}", entity.getName());
-    }
-
-    @Override
-    public void onRemove(Entity entity) throws FalconException {
-        Map<Node, Set<Node>> nodeEdges = null;
-        switch (entity.getEntityType()) {
-        case PROCESS:
-            nodeEdges = getEdgesFor((Process) entity);
-            break;
-        case FEED:
-            nodeEdges = getEdgesFor((Feed) entity);
-            break;
-        default:
-        }
-        if (nodeEdges == null) {
-            return;
-        }
-
-        for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) {
-            if (graph.containsKey(entry.getKey())) {
-                graph.get(entry.getKey()).removeAll(entry.getValue());
-                if (graph.get(entry.getKey()).isEmpty()) {
-                    graph.remove(entry.getKey());
-                }
-            }
-        }
-    }
-
-    @Override
-    public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
-        onRemove(oldEntity);
-        onAdd(newEntity);
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
-    }
-
-    private Map<Node, Set<Node>> getEdgesFor(Process process) {
-        Map<Node, Set<Node>> nodeEdges = new HashMap<Node, Set<Node>>();
-        Node processNode = new Node(EntityType.PROCESS, process.getName());
-        nodeEdges.put(processNode, new HashSet<Node>());
-        Set<Node> processEdges = nodeEdges.get(processNode);
-        if (process.getInputs() != null) {
-            for (Input input : process.getInputs().getInputs()) {
-                Node feedNode = new Node(EntityType.FEED, input.getFeed());
-                if (!nodeEdges.containsKey(feedNode)) {
-                    nodeEdges.put(feedNode, new HashSet<Node>());
-                }
-                Set<Node> feedEdges = nodeEdges.get(feedNode);
-                processEdges.add(feedNode);
-                feedEdges.add(processNode);
-            }
-        }
-        if (process.getOutputs() != null) {
-            for (Output output : process.getOutputs().getOutputs()) {
-                Node feedNode = new Node(EntityType.FEED, output.getFeed());
-                if (!nodeEdges.containsKey(feedNode)) {
-                    nodeEdges.put(feedNode, new HashSet<Node>());
-                }
-                Set<Node> feedEdges = nodeEdges.get(feedNode);
-                processEdges.add(feedNode);
-                feedEdges.add(processNode);
-            }
-        }
-
-        for (Cluster cluster : process.getClusters().getClusters()) {
-            Node clusterNode = new Node(EntityType.CLUSTER, cluster.getName());
-            processEdges.add(clusterNode);
-            nodeEdges.put(clusterNode, new HashSet<Node>());
-            nodeEdges.get(clusterNode).add(processNode);
-        }
-
-        return nodeEdges;
-    }
-
-    private Map<Node, Set<Node>> getEdgesFor(Feed feed) {
-        Map<Node, Set<Node>> nodeEdges = new HashMap<Node, Set<Node>>();
-        Node feedNode = new Node(EntityType.FEED, feed.getName());
-        Set<Node> feedEdges = new HashSet<Node>();
-        nodeEdges.put(feedNode, feedEdges);
-
-        for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
-            Node clusterNode = new Node(EntityType.CLUSTER, cluster.getName());
-            if (!nodeEdges.containsKey(clusterNode)) {
-                nodeEdges.put(clusterNode, new HashSet<Node>());
-            }
-            Set<Node> clusterEdges = nodeEdges.get(clusterNode);
-            feedEdges.add(clusterNode);
-            clusterEdges.add(feedNode);
-
-            if (FeedHelper.isImportEnabled(cluster)) {
-                Node dbNode = new Node(EntityType.DATASOURCE, FeedHelper.getImportDatasourceName(cluster));
-                if (!nodeEdges.containsKey(dbNode)) {
-                    nodeEdges.put(dbNode, new HashSet<Node>());
-                }
-                Set<Node> dbEdges = nodeEdges.get(dbNode);
-                feedEdges.add(dbNode);
-                dbEdges.add(feedNode);
-            }
-        }
-        return nodeEdges;
-    }
-
-    /**
-     * Node element in the graph.
-     */
-    private static final class Node {
-
-        private final EntityType type;
-        private final String name;
-
-        private Node(EntityType type, String name) {
-            this.type = type;
-            this.name = name;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            Node node = (Node) o;
-
-            boolean nameEqual = name != null ? !name.equals(node.name) : node.name != null;
-
-            if (nameEqual) {
-                return false;
-            }
-            if (type != node.type) {
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = type != null ? type.hashCode() : 0;
-            result = 31 * result + (name != null ? name.hashCode() : 0);
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return "(" + type + ") " + name;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
deleted file mode 100644
index 4c7e913..0000000
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.v0;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Helper methods to check integrity of entity.
- */
-public final class EntityIntegrityChecker {
-
-    private EntityIntegrityChecker() {}
-
-    public static Pair<String, EntityType>[] referencedBy(Entity entity) throws FalconException {
-        Set<Entity> deps = EntityGraph.get().getDependents(entity);
-        if (deps == null) {
-            return null;
-        }
-
-        switch (entity.getEntityType()) {
-        case CLUSTER:
-            return filter(deps, EntityType.FEED, EntityType.PROCESS);
-
-        case FEED:
-            return filter(deps, EntityType.PROCESS);
-
-        case DATASOURCE:
-            return filter(deps, EntityType.FEED);
-
-        default:
-            return null;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private static Pair<String, EntityType>[] filter(Set<Entity> deps, EntityType... types) {
-        List<Pair<String, EntityType>> filteredSet = new ArrayList<Pair<String, EntityType>>();
-        List<EntityType> validTypes = Arrays.asList(types);
-        for (Entity dep : deps) {
-            if (validTypes.contains(dep.getEntityType())) {
-                filteredSet.add(Pair.of(dep.getName(), dep.getEntityType()));
-            }
-        }
-        return filteredSet.toArray(new Pair[0]);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java b/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
deleted file mode 100644
index cad196b..0000000
--- a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.entity.v0;
-
-import org.apache.falcon.FalconException;
-
-/**
- * This exception is thrown when Unschedulable entity
- * like CLUSTER is tried with actions like Schedule, Suspend,
- * Resume.
- */
-public class UnschedulableEntityException extends FalconException {
-
-    private static final long serialVersionUID = -1134342662497698943L;
-
-    public UnschedulableEntityException(Exception e) {
-        super(e);
-    }
-
-    public UnschedulableEntityException(String message, Exception e) {
-        super(message, e);
-    }
-
-    public UnschedulableEntityException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
deleted file mode 100644
index 65aaeba..0000000
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.expression;
-
-import org.apache.commons.el.ExpressionEvaluatorImpl;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-import javax.servlet.jsp.el.FunctionMapper;
-import javax.servlet.jsp.el.VariableResolver;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Properties;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Helper for evaluating expressions.
- */
-public final class ExpressionHelper implements FunctionMapper, VariableResolver {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ExpressionHelper.class);
-    private static final ExpressionHelper INSTANCE = new ExpressionHelper();
-
-    private static final ThreadLocal<Properties> THREAD_VARIABLES = new ThreadLocal<Properties>();
-
-    private static final Pattern SYS_PROPERTY_PATTERN = Pattern.compile("\\$\\{[A-Za-z0-9_.]+\\}");
-
-    private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
-    private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
-
-    public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>() {
-        @Override
-        protected SimpleDateFormat initialValue() {
-            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
-            format.setTimeZone(TimeZone.getTimeZone("UTC"));
-            return format;
-        }
-    };
-
-    public static ExpressionHelper get() {
-        return INSTANCE;
-    }
-
-    private enum DayOfWeek {
-        SUN, MON, TUE, WED, THU, FRI, SAT
-    }
-
-    private ExpressionHelper() {
-    }
-
-    public <T> T evaluate(String expression, Class<T> clazz) throws FalconException {
-        return evaluateFullExpression("${" + expression + "}", clazz);
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T> T evaluateFullExpression(String expression, Class<T> clazz) throws FalconException {
-        try {
-            return (T) EVALUATOR.evaluate(expression, clazz, RESOLVER, RESOLVER);
-        } catch (ELException e) {
-            throw new FalconException("Unable to evaluate " + expression, e);
-        }
-    }
-
-    @Override
-    public Method resolveFunction(String prefix, String name) {
-        for (Method method : ExpressionHelper.class.getDeclaredMethods()) {
-            if (method.getName().equals(name)) {
-                return method;
-            }
-        }
-        throw new UnsupportedOperationException("Not found " + prefix + ":" + name);
-    }
-
-    public void setPropertiesForVariable(Properties properties) {
-        THREAD_VARIABLES.set(properties);
-    }
-
-    @Override
-    public Object resolveVariable(String field) {
-        return THREAD_VARIABLES.get().get(field);
-    }
-
-    private static ThreadLocal<Date> referenceDate = new ThreadLocal<Date>();
-
-    public static void setReferenceDate(Date date) {
-        referenceDate.set(date);
-        Properties variables = getTimeVariables(date, TimeZone.getTimeZone("UTC"));
-        THREAD_VARIABLES.set(variables);
-    }
-
-    public static Properties getTimeVariables(Date date, TimeZone tz) {
-        Properties vars = new Properties();
-        Calendar cal = Calendar.getInstance(tz);
-        cal.setTime(date);
-        vars.put(FeedDataPath.VARS.YEAR.name(), String.format("%04d", cal.get(Calendar.YEAR)));
-        vars.put(FeedDataPath.VARS.MONTH.name(), String.format("%02d", (cal.get(Calendar.MONTH) + 1)));
-        vars.put(FeedDataPath.VARS.DAY.name(), String.format("%02d", cal.get(Calendar.DAY_OF_MONTH)));
-        vars.put(FeedDataPath.VARS.HOUR.name(), String.format("%02d", cal.get(Calendar.HOUR_OF_DAY)));
-        vars.put(FeedDataPath.VARS.MINUTE.name(), String.format("%02d", cal.get(Calendar.MINUTE)));
-        return vars;
-    }
-
-    private static int getDayOffset(String weekDayName) {
-        int day;
-        Calendar nominalTime = Calendar.getInstance();
-        nominalTime.setTimeZone(TimeZone.getTimeZone("UTC"));
-        nominalTime.setTime(referenceDate.get());
-        int currentWeekDay = nominalTime.get(Calendar.DAY_OF_WEEK);
-        int weekDay = DayOfWeek.valueOf(weekDayName).ordinal() + 1; //to map to Calendar.SUNDAY ...
-        day = weekDay - currentWeekDay;
-        if (weekDay > currentWeekDay) {
-            day = day - 7;
-        }
-        return day;
-    }
-
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SF_SWITCH_FALLTHROUGH"})
-    private static Date getRelative(Date date, int boundary, int month, int day, int hour, int minute) {
-        Calendar dsInstanceCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-        dsInstanceCal.setTime(date);
-        switch (boundary) {
-        case Calendar.YEAR:
-            dsInstanceCal.set(Calendar.MONTH, 0);
-        case Calendar.MONTH:
-            dsInstanceCal.set(Calendar.DAY_OF_MONTH, 1);
-        case Calendar.DAY_OF_MONTH:
-            dsInstanceCal.set(Calendar.HOUR_OF_DAY, 0);
-        case Calendar.HOUR:
-            dsInstanceCal.set(Calendar.MINUTE, 0);
-            dsInstanceCal.set(Calendar.SECOND, 0);
-            dsInstanceCal.set(Calendar.MILLISECOND, 0);
-            break;
-        case Calendar.SECOND:
-            break;
-        default:
-            throw new IllegalArgumentException("Invalid boundary " + boundary);
-        }
-
-        dsInstanceCal.add(Calendar.YEAR, 0);
-        dsInstanceCal.add(Calendar.MONTH, month);
-        dsInstanceCal.add(Calendar.DAY_OF_MONTH, day);
-        dsInstanceCal.add(Calendar.HOUR_OF_DAY, hour);
-        dsInstanceCal.add(Calendar.MINUTE, minute);
-        return dsInstanceCal.getTime();
-    }
-
-    public static Date now(int hour, int minute) {
-        return getRelative(referenceDate.get(), Calendar.SECOND, 0, 0, hour, minute);
-    }
-
-    public static Date today(int hour, int minute) {
-        return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, 0, hour, minute);
-    }
-
-    public static Date yesterday(int hour, int minute) {
-        return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, -1, hour, minute);
-    }
-
-    public static Date currentMonth(int day, int hour, int minute) {
-        return getRelative(referenceDate.get(), Calendar.MONTH, 0, day, hour, minute);
-    }
-
-    public static Date lastMonth(int day, int hour, int minute) {
-        return getRelative(referenceDate.get(), Calendar.MONTH, -1, day, hour, minute);
-    }
-
-    public static Date currentWeek(String weekDay, int hour, int minute) {
-        int day = getDayOffset(weekDay);
-        return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, day, hour, minute);
-    }
-
-    public static Date lastWeek(String weekDay, int hour, int minute) {
-        int day = getDayOffset(weekDay);
-        return getRelative(referenceDate.get(), Calendar.DAY_OF_MONTH, 0, day - 7, hour, minute);
-    }
-
-    public static Date currentYear(int month, int day, int hour, int minute) {
-        return getRelative(referenceDate.get(), Calendar.YEAR, month, day, hour, minute);
-    }
-
-    public static Date lastYear(int month, int day, int hour, int minute) {
-        return getRelative(referenceDate.get(), Calendar.YEAR, month - 12, day, hour, minute);
-    }
-
-    public static Date latest(int n) {
-        //by pass Falcon validations
-        return referenceDate.get();
-    }
-
-    public static Date future(int n, int limit) {
-        //by pass Falcon validations
-        return referenceDate.get();
-    }
-
-    public static long hours(int val) {
-        return TimeUnit.HOURS.toMillis(val);
-    }
-
-    public static long minutes(int val) {
-        return TimeUnit.MINUTES.toMillis(val);
-    }
-
-    public static long days(int val) {
-        return TimeUnit.DAYS.toMillis(val);
-    }
-
-    public static long months(int val) {
-        return val * days(31);
-    }
-
-    public static long years(int val) {
-        return val * days(366);
-    }
-
-    public static String substitute(String originalValue) {
-        return substitute(originalValue, System.getProperties());
-    }
-
-    public static String substitute(String originalValue, Properties properties) {
-        Matcher envVarMatcher = SYS_PROPERTY_PATTERN.matcher(originalValue);
-        while (envVarMatcher.find()) {
-            String envVar = originalValue.substring(envVarMatcher.start() + 2,
-                    envVarMatcher.end() - 1);
-            String envVal = properties.getProperty(envVar, System.getenv(envVar));
-
-            envVar = "\\$\\{" + envVar + "\\}";
-            if (envVal != null) {
-                originalValue = originalValue.replaceAll(envVar, Matcher.quoteReplacement(envVal));
-                envVarMatcher = SYS_PROPERTY_PATTERN.matcher(originalValue);
-            }
-        }
-        return originalValue;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
deleted file mode 100644
index d288925..0000000
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.group;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
-
-/**
- * Group, which represents a logical group of feeds which can belong to this
- * group.
- */
-public class FeedGroup {
-
-    public FeedGroup(String group, Frequency frequency, String path) {
-        this.name = group;
-        this.frequency = frequency;
-        this.datePattern = getDatePattern(path);
-        this.feeds = Collections
-                .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-    }
-
-    public static String getDatePattern(String path) {
-        Matcher matcher = FeedDataPath.PATTERN.matcher(path);
-        List<String> fields = new ArrayList<String>();
-        while (matcher.find()) {
-            String var = path.substring(matcher.start(), matcher.end());
-            fields.add(var);
-        }
-        Collections.sort(fields);
-        return fields.toString();
-    }
-
-    private String name;
-    private Frequency frequency;
-    private String datePattern;
-    private Set<String> feeds;
-
-    public Set<String> getFeeds() {
-        return feeds;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == null || !(obj instanceof FeedGroup)) {
-            return false;
-        }
-        FeedGroup group = (FeedGroup) obj;
-        return (this.name.equals(group.getName())
-                && this.frequency.equals(group.frequency)
-                && this.datePattern
-                .equals(group.datePattern));
-
-    }
-
-    @Override
-    public int hashCode() {
-        return 127 * name.hashCode() + 31 * frequency.hashCode() + datePattern.hashCode();
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public Frequency getFrequency() {
-        return frequency;
-    }
-
-    public String getDatePattern() {
-        return datePattern;
-    }
-
-    public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
-        return this.frequency.equals(feed.getFrequency())
-                && this.datePattern.equals(getDatePattern(
-                    FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA)));
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
deleted file mode 100644
index a832366..0000000
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.group;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.service.ConfigurationChangeListener;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Has 2 way mappings from feed to group and group to feed.
- */
-public final class FeedGroupMap implements ConfigurationChangeListener {
-
-    private static final FeedGroupMap INSTANCE = new FeedGroupMap();
-    private Map<String, FeedGroup> groupsMapping = new ConcurrentHashMap<String, FeedGroup>();
-
-    private FeedGroupMap() {
-        // singleton
-    }
-
-    public static FeedGroupMap get() {
-        return INSTANCE;
-    }
-
-    public Map<String, FeedGroup> getGroupsMapping() {
-        return Collections.unmodifiableMap(groupsMapping);
-    }
-
-    @Override
-    public void onAdd(Entity entity) throws FalconException {
-
-        if (entity.getEntityType().equals(EntityType.FEED)) {
-            Feed feed = (Feed) entity;
-            if (feed.getGroups() == null || feed.getGroups().equals("")) {
-                return;
-            }
-            Set<FeedGroup> groupSet = getGroups(feed);
-            addGroups(feed.getName(), groupSet);
-        }
-    }
-
-    @Override
-    public void onRemove(Entity entity) throws FalconException {
-        if (entity.getEntityType().equals(EntityType.FEED)) {
-            Feed feed = (Feed) entity;
-            if (StringUtils.isEmpty(feed.getGroups())) {
-                return;
-            }
-            String[] groups = feed.getGroups().split(",");
-            for (String group : groups) {
-                groupsMapping.get(group).getFeeds().remove(entity.getName());
-                if (groupsMapping.get(group).getFeeds().size() == 0) {
-                    groupsMapping.remove(group);
-                }
-            }
-
-        }
-    }
-
-    @Override
-    public void onChange(Entity oldEntity, Entity newEntity)
-        throws FalconException {
-
-        onRemove(oldEntity);
-        onAdd(newEntity);
-    }
-
-    @Override
-    public void onReload(Entity entity) throws FalconException {
-        onAdd(entity);
-    }
-
-    private void addGroups(String feed, Set<FeedGroup> groups) {
-        for (FeedGroup group : groups) {
-            if (groupsMapping.containsKey(group.getName())) {
-                groupsMapping.get(group.getName()).getFeeds().add(feed);
-            } else {
-                group.getFeeds().add(feed);
-                groupsMapping.put(group.getName(), group);
-            }
-        }
-    }
-
-    public Set<FeedGroup> getGroups(String groups, Frequency frequency, String path) {
-        Set<FeedGroup> groupSet = new HashSet<FeedGroup>();
-        String[] groupArray = groups.split(",");
-        for (String group : groupArray) {
-            groupSet.add(new FeedGroup(group, frequency, path));
-        }
-        return groupSet;
-    }
-
-    public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) throws FalconException {
-        return getGroups(feed.getGroups(), feed.getFrequency(),
-                FeedHelper.createStorage(feed).getUriTemplate(LocationType.DATA));
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java b/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
deleted file mode 100644
index e33d353..0000000
--- a/common/src/main/java/org/apache/falcon/hadoop/HadoopClientFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hadoop;
-
-import org.apache.commons.lang.Validate;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
-
-/**
- * A factory implementation to dole out FileSystem handles based on the logged in user.
- */
-public final class HadoopClientFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(HadoopClientFactory.class);
-
-    public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
-    public static final String MR_JT_ADDRESS_KEY = "mapreduce.jobtracker.address";
-    public static final String YARN_RM_ADDRESS_KEY = "yarn.resourcemanager.address";
-
-    public static final FsPermission READ_EXECUTE_PERMISSION =
-            new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE);
-    public static final FsPermission ALL_PERMISSION =
-            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-
-    private static final HadoopClientFactory INSTANCE = new HadoopClientFactory();
-
-    private HadoopClientFactory() {
-    }
-
-    public static HadoopClientFactory get() {
-        return INSTANCE;
-    }
-
-    /**
-     * This method is only used by Falcon internally to talk to the config store on HDFS.
-     *
-     * @param uri file system URI for config store.
-     * @return FileSystem created with the provided proxyUser/group.
-     * @throws org.apache.falcon.FalconException
-     *          if the filesystem could not be created.
-     */
-    public FileSystem createFalconFileSystem(final URI uri) throws FalconException {
-        Validate.notNull(uri, "uri cannot be null");
-
-        try {
-            Configuration conf = new Configuration();
-            if (UserGroupInformation.isSecurityEnabled()) {
-                conf.set(SecurityUtil.NN_PRINCIPAL, StartupProperties.get().getProperty(SecurityUtil.NN_PRINCIPAL));
-            }
-
-            return createFileSystem(UserGroupInformation.getLoginUser(), uri, conf);
-        } catch (IOException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + uri, e);
-        }
-    }
-
-    /**
-     * This method is only used by Falcon internally to talk to the config store on HDFS.
-     *
-     * @param conf configuration.
-     * @return FileSystem created with the provided proxyUser/group.
-     * @throws org.apache.falcon.FalconException
-     *          if the filesystem could not be created.
-     */
-    public FileSystem createFalconFileSystem(final Configuration conf)
-        throws FalconException {
-        Validate.notNull(conf, "configuration cannot be null");
-
-        String nameNode = getNameNode(conf);
-        try {
-            return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
-        } catch (IOException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
-        }
-    }
-
-    /**
-     * Return a FileSystem created with the authenticated proxy user for the specified conf.
-     *
-     * @param conf Configuration with all necessary information to create the FileSystem.
-     * @return FileSystem created with the provided proxyUser/group.
-     * @throws org.apache.falcon.FalconException
-     *          if the filesystem could not be created.
-     */
-    public FileSystem createProxiedFileSystem(final Configuration conf)
-        throws FalconException {
-        Validate.notNull(conf, "configuration cannot be null");
-
-        String nameNode = getNameNode(conf);
-        try {
-            return createProxiedFileSystem(new URI(nameNode), conf);
-        } catch (URISyntaxException e) {
-            throw new FalconException("Exception while getting FileSystem for: " + nameNode, e);
-        }
-    }
-
-    private static String getNameNode(Configuration conf) {
-        return conf.get(FS_DEFAULT_NAME_KEY);
-    }
-
-    /**
-     * This method is called from with in a workflow execution context.
-     *
-     * @param uri uri
-     * @return file system handle
-     * @throws FalconException
-     */
-    public FileSystem createProxiedFileSystem(final URI uri) throws FalconException {
-        return createProxiedFileSystem(uri, new Configuration());
-    }
-
-    public FileSystem createProxiedFileSystem(final URI uri,
-                                              final Configuration conf) throws FalconException {
-        Validate.notNull(uri, "uri cannot be null");
-
-        try {
-            return createFileSystem(CurrentUser.getProxyUGI(), uri, conf);
-        } catch (IOException e) {
-            throw new FalconException("Exception while getting FileSystem for proxy: "
-                + CurrentUser.getUser(), e);
-        }
-    }
-
-    /**
-     * Return a FileSystem created with the provided user for the specified URI.
-     *
-     * @param ugi user group information
-     * @param uri  file system URI.
-     * @param conf Configuration with all necessary information to create the FileSystem.
-     * @return FileSystem created with the provided user/group.
-     * @throws org.apache.falcon.FalconException
-     *          if the filesystem could not be created.
-     */
-    @SuppressWarnings("ResultOfMethodCallIgnored")
-    public FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
-                                       final Configuration conf) throws FalconException {
-        Validate.notNull(ugi, "ugi cannot be null");
-        Validate.notNull(conf, "configuration cannot be null");
-
-        try {
-            if (UserGroupInformation.isSecurityEnabled()) {
-                ugi.checkTGTAndReloginFromKeytab();
-            }
-        } catch (IOException ioe) {
-            throw new FalconException("Exception while getting FileSystem. Unable to check TGT for user "
-                    + ugi.getShortUserName(), ioe);
-        }
-
-        validateNameNode(uri, conf);
-
-        try {
-            // prevent falcon impersonating falcon, no need to use doas
-            final String proxyUserName = ugi.getShortUserName();
-            if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
-                LOG.info("Creating FS for the login user {}, impersonation not required",
-                    proxyUserName);
-                return FileSystem.get(uri, conf);
-            }
-
-            LOG.info("Creating FS impersonating user {}", proxyUserName);
-            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-                public FileSystem run() throws Exception {
-                    return FileSystem.get(uri, conf);
-                }
-            });
-        } catch (InterruptedException ex) {
-            throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
-        } catch (IOException ex) {
-            throw new FalconException("Exception creating FileSystem:" + ex.getMessage(), ex);
-        }
-    }
-
-    /**
-     * This method validates if the execute url is able to reach the MR endpoint.
-     *
-     * @param executeUrl jt url or RM url
-     * @throws IOException
-     */
-    public void validateJobClient(String executeUrl) throws IOException {
-        final JobConf jobConf = new JobConf();
-        jobConf.set(MR_JT_ADDRESS_KEY, executeUrl);
-        jobConf.set(YARN_RM_ADDRESS_KEY, executeUrl);
-
-        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-        try {
-            JobClient jobClient = loginUser.doAs(new PrivilegedExceptionAction<JobClient>() {
-                public JobClient run() throws Exception {
-                    return new JobClient(jobConf);
-                }
-            });
-
-            jobClient.getClusterStatus().getMapTasks();
-        } catch (InterruptedException e) {
-            throw new IOException("Exception creating job client:" + e.getMessage(), e);
-        }
-    }
-
-    public static FsPermission getDirDefaultPermission(Configuration conf) {
-        return getDirDefault().applyUMask(FsPermission.getUMask(conf));
-    }
-
-    public static FsPermission getFileDefaultPermission(Configuration conf) {
-        return getFileDefault().applyUMask(FsPermission.getUMask(conf));
-    }
-
-    public static FsPermission getDirDefault() {
-        return new FsPermission((short)511);
-    }
-
-    public static FsPermission getFileDefault() {
-        return new FsPermission((short)438);
-    }
-
-    public static void mkdirsWithDefaultPerms(FileSystem fs, Path path) throws IOException {
-        mkdirs(fs, path, getDirDefaultPermission(fs.getConf()));
-    }
-
-    public static void mkdirs(FileSystem fs, Path path,
-                              FsPermission permission) throws IOException {
-        if (!FileSystem.mkdirs(fs, path, permission)) {
-            throw new IOException("mkdir failed for " + path);
-        }
-    }
-
-    private void validateNameNode(URI uri, Configuration conf) throws FalconException {
-        String nameNode = uri.getAuthority();
-        if (nameNode == null) {
-            nameNode = getNameNode(conf);
-            if (nameNode != null) {
-                try {
-                    new URI(nameNode).getAuthority();
-                } catch (URISyntaxException ex) {
-                    throw new FalconException("Exception while getting FileSystem", ex);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java b/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
deleted file mode 100644
index 5bcc2f8..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/AbstractPolicyBuilderFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle;
-
-import org.apache.falcon.FalconException;
-
-/**
- * Abstract factory class for feed lifecycle policy builders.
- */
-public abstract class AbstractPolicyBuilderFactory {
-
-    public abstract PolicyBuilder getPolicyBuilder(String policyName) throws FalconException;
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java b/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
deleted file mode 100644
index 833ad04..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/FeedLifecycleStage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.lifecycle;
-
-/**
- * Enum for valid lifecycle stages for the feed.
- */
-public enum FeedLifecycleStage {
-
-    RETENTION("AgeBasedDelete");
-
-    private String defaultPolicyName;
-
-    private FeedLifecycleStage(String defaultPolicyName) {
-        this.defaultPolicyName = defaultPolicyName;
-    }
-
-    public String getDefaultPolicyName() {
-        return defaultPolicyName;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
deleted file mode 100644
index be4e68c..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/LifecyclePolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.hadoop.fs.Path;
-
-import java.util.Properties;
-
-/**
- * Interface for all policies in feed lifecycle.
- */
-public interface LifecyclePolicy {
-
-    /**
-     * Returns the name of the policy. Name of policy must be unique as it is used as an identifier.
-     * @return name of the policy
-     */
-    String getName();
-
-    /**
-     * Returns the stage to which the policy belongs.
-     * @return stage to which the policy belongs.
-     */
-    FeedLifecycleStage getStage();
-
-    /**
-     * Validates the configurations as per this policy.
-     * @param feed Parent feed for which the policy is configured.
-     * @param clusterName cluster to be used as context for validation.
-     * @throws FalconException
-     */
-    void validate(Feed feed, String clusterName) throws FalconException;
-
-    /**
-     * Builds workflow engine artifacts.
-     * @param cluster cluster to be used as context
-     * @param buildPath base path to be used for storing the artifacts.
-     * @param feed Parent feed.
-     * @return Properties to be passed to the caller e.g. bundle in case of oozie workflow engine.
-     * @throws FalconException
-     */
-    Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java b/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
deleted file mode 100644
index 5e5055b..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/PolicyBuilder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.hadoop.fs.Path;
-
-import java.util.Properties;
-
-/**
- * Interface to be implemented by all policy builders for a lifecycle policy.
- * A Builder builds workflow engine specific artifacts for a policy.
- */
-public interface PolicyBuilder {
-
-    Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException;
-
-    String getPolicyName();
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
deleted file mode 100644
index 8d735f9..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle.retention;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.parser.ValidationException;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.entity.v0.feed.RetentionStage;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.util.StartupProperties;
-
-import java.util.Date;
-
-/**
- * Retention policy which deletes all instances of instance time older than a given time.
- * It will create the workflow and coordinators for this policy.
- */
-public class AgeBasedDelete extends RetentionPolicy {
-
-    public static final String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit";
-
-    @Override
-    public void validate(Feed feed, String clusterName) throws FalconException {
-        // validate that it is a valid cluster
-        Cluster cluster = FeedHelper.getCluster(feed, clusterName);
-        Frequency retentionLimit = getRetentionLimit(feed, clusterName);
-        if (cluster != null) {
-            validateLimitWithSla(feed, cluster, retentionLimit.toString());
-            validateLimitWithLateData(feed, cluster, retentionLimit.toString());
-            String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl",
-                    "org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory");
-            if ("org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory".equals(lifecycleEngine)) {
-                validateRetentionFrequencyForOozie(feed, clusterName);
-            }
-        }
-    }
-
-
-    private void validateRetentionFrequencyForOozie(Feed feed, String clusterName) throws FalconException {
-        // retention shouldn't be more frequent than hours(1) for Oozie Builders.
-        Frequency retentionFrequency = FeedHelper.getLifecycleRetentionFrequency(feed, clusterName);
-        if (retentionFrequency.getTimeUnit() == Frequency.TimeUnit.minutes
-                && retentionFrequency.getFrequencyAsInt() < 60) {
-            throw new ValidationException("Feed Retention can not be more frequent than hours(1)");
-        }
-    }
-
-    private void validateLimitWithLateData(Feed feed, Cluster cluster, String retention) throws FalconException {
-        ExpressionHelper evaluator = ExpressionHelper.get();
-        long retentionPeriod = evaluator.evaluate(retention, Long.class);
-
-        if (feed.getLateArrival() != null) {
-            String feedCutoff = feed.getLateArrival().getCutOff().toString();
-            long feedCutOffPeriod = evaluator.evaluate(feedCutoff, Long.class);
-            if (retentionPeriod < feedCutOffPeriod) {
-                throw new ValidationException(
-                        "Feed's retention limit: " + retention + " of referenced cluster " + cluster.getName()
-                                + " should be more than feed's late arrival cut-off period: " + feedCutoff
-                                + " for feed: " + feed.getName());
-            }
-        }
-    }
-
-    private void validateLimitWithSla(Feed feed, Cluster cluster, String retentionExpression) throws FalconException {
-        // test that slaHigh is less than retention
-        Sla clusterSla = FeedHelper.getSLA(cluster, feed);
-        if (clusterSla != null) {
-            ExpressionHelper evaluator = ExpressionHelper.get();
-            ExpressionHelper.setReferenceDate(new Date());
-
-            Frequency slaHighExpression = clusterSla.getSlaHigh();
-            Date slaHigh = new Date(evaluator.evaluate(slaHighExpression.toString(), Long.class));
-
-            Date retention = new Date(evaluator.evaluate(retentionExpression, Long.class));
-            if (slaHigh.after(retention)) {
-                throw new ValidationException("slaHigh of Feed: " + slaHighExpression
-                        + " is greater than retention of the feed: " + retentionExpression
-                        + " for cluster: " + cluster.getName()
-                );
-            }
-        }
-    }
-
-    public Frequency getRetentionLimit(Feed feed, String clusterName) throws FalconException {
-        RetentionStage retention = FeedHelper.getRetentionStage(feed, clusterName);
-        if (retention != null) {
-            String limit = null;
-            for (Property property : retention.getProperties().getProperties()) {
-                if (StringUtils.equals(property.getName(), LIMIT_PROPERTY_NAME)) {
-                    limit = property.getValue();
-                }
-            }
-            if (limit == null) {
-                throw new FalconException("Property: " + LIMIT_PROPERTY_NAME + " is required for "
-                        + getName() + " policy.");
-            }
-            try {
-                return new Frequency(limit);
-            } catch (IllegalArgumentException e) {
-                throw new FalconException("Invalid value for property: " + LIMIT_PROPERTY_NAME + ", should be a valid "
-                        + "frequency e.g. hours(2)", e);
-            }
-        } else {
-            throw new FalconException("Cluster " + clusterName + " doesn't contain retention stage");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
deleted file mode 100644
index 7fd6175..0000000
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/RetentionPolicy.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.lifecycle.retention;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.lifecycle.AbstractPolicyBuilderFactory;
-import org.apache.falcon.lifecycle.FeedLifecycleStage;
-import org.apache.falcon.lifecycle.LifecyclePolicy;
-import org.apache.falcon.lifecycle.PolicyBuilder;
-import org.apache.falcon.workflow.WorkflowEngineFactory;
-import org.apache.hadoop.fs.Path;
-
-import java.util.Properties;
-
-/**
- * All retention policies must implement this interface.
- */
-public abstract class RetentionPolicy implements LifecyclePolicy {
-
-    @Override
-    public String getName() {
-        return this.getClass().getSimpleName();
-    }
-
-    @Override
-    public FeedLifecycleStage getStage() {
-        return FeedLifecycleStage.RETENTION;
-    }
-
-    @Override
-    public Properties build(Cluster cluster, Path buildPath, Feed feed) throws FalconException {
-        AbstractPolicyBuilderFactory factory = WorkflowEngineFactory.getLifecycleEngine();
-        PolicyBuilder builder = factory.getPolicyBuilder(getName());
-        return builder.build(cluster, buildPath, feed);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
deleted file mode 100644
index 25bbf0c..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/EntityRelationshipGraphBuilder.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.datasource.Datasource;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Entity Metadata relationship mapping helper.
- */
-public class EntityRelationshipGraphBuilder extends RelationshipGraphBuilder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EntityRelationshipGraphBuilder.class);
-
-
-    public EntityRelationshipGraphBuilder(Graph graph, boolean preserveHistory) {
-        super(graph, preserveHistory);
-    }
-
-    public void addEntity(Entity entity) {
-        EntityType entityType = entity.getEntityType();
-        switch (entityType) {
-        case CLUSTER:
-            addClusterEntity((Cluster) entity);
-            break;
-        case PROCESS:
-            addProcessEntity((Process) entity);
-            break;
-        case FEED:
-            addFeedEntity((Feed) entity);
-            break;
-        case DATASOURCE:
-            addDatasourceEntity((Datasource) entity);
-            break;
-
-        default:
-            throw new IllegalArgumentException("Invalid EntityType " + entityType);
-        }
-    }
-
-    public void addClusterEntity(Cluster clusterEntity) {
-        LOG.info("Adding cluster entity: {}", clusterEntity.getName());
-        Vertex clusterVertex = addVertex(clusterEntity.getName(), RelationshipType.CLUSTER_ENTITY);
-
-        addUserRelation(clusterVertex);
-        addColoRelation(clusterEntity.getColo(), clusterVertex);
-        addDataClassification(clusterEntity.getTags(), clusterVertex);
-    }
-
-    public void addFeedEntity(Feed feed) {
-        LOG.info("Adding feed entity: {}", feed.getName());
-        Vertex feedVertex = addVertex(feed.getName(), RelationshipType.FEED_ENTITY);
-
-        addUserRelation(feedVertex);
-        addDataClassification(feed.getTags(), feedVertex);
-        addGroups(feed.getGroups(), feedVertex);
-
-        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
-            if (ClusterType.TARGET != feedCluster.getType()) {
-                addRelationToCluster(feedVertex, feedCluster.getName(), RelationshipLabel.FEED_CLUSTER_EDGE);
-            }
-        }
-
-        for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
-            if (FeedHelper.isImportEnabled(feedCluster)) {
-                addRelationToDatasource(feedVertex, FeedHelper.getImportDatasourceName(feedCluster),
-                        RelationshipLabel.DATASOURCE_IMPORT_EDGE);
-            }
-        }
-    }
-
-    public void addDatasourceEntity(Datasource dsEntity) {
-        LOG.info("Adding datasource entity: {}", dsEntity.getName());
-        Vertex dsVertex = addVertex(dsEntity.getName(), RelationshipType.DATASOURCE_ENTITY);
-
-        addUserRelation(dsVertex);
-        addColoRelation(dsEntity.getColo(), dsVertex);
-        addDataClassification(dsEntity.getTags(), dsVertex);
-    }
-
-
-    public void updateEntity(Entity oldEntity, Entity newEntity) {
-        EntityType entityType = oldEntity.getEntityType();
-        switch (entityType) {
-        case CLUSTER:
-            // a cluster cannot be updated
-            break;
-        case PROCESS:
-            updateProcessEntity((Process) oldEntity, (Process) newEntity);
-            break;
-        case FEED:
-            updateFeedEntity((Feed) oldEntity, (Feed) newEntity);
-            break;
-        default:
-            throw new IllegalArgumentException("Invalid EntityType " + entityType);
-        }
-    }
-
-
-
-    public void updateFeedEntity(Feed oldFeed, Feed newFeed) {
-        LOG.info("Updating feed entity: {}", newFeed.getName());
-        Vertex feedEntityVertex = findVertex(oldFeed.getName(), RelationshipType.FEED_ENTITY);
-        if (feedEntityVertex == null) {
-            LOG.error("Illegal State: Feed entity vertex must exist for {}", oldFeed.getName());
-            throw new IllegalStateException(oldFeed.getName() + " entity vertex must exist.");
-        }
-
-        updateDataClassification(oldFeed.getTags(), newFeed.getTags(), feedEntityVertex);
-        updateGroups(oldFeed.getGroups(), newFeed.getGroups(), feedEntityVertex);
-        updateFeedClusters(oldFeed.getClusters().getClusters(),
-                newFeed.getClusters().getClusters(), feedEntityVertex);
-    }
-
-    public void addProcessEntity(Process process) {
-        String processName = process.getName();
-        LOG.info("Adding process entity: {}", processName);
-        Vertex processVertex = addVertex(processName, RelationshipType.PROCESS_ENTITY);
-        addWorkflowProperties(process.getWorkflow(), processVertex, processName);
-
-        addUserRelation(processVertex);
-        addDataClassification(process.getTags(), processVertex);
-        addPipelines(process.getPipelines(), processVertex);
-
-        for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
-            addRelationToCluster(processVertex, cluster.getName(), RelationshipLabel.PROCESS_CLUSTER_EDGE);
-        }
-
-        addInputFeeds(process.getInputs(), processVertex);
-        addOutputFeeds(process.getOutputs(), processVertex);
-    }
-
-    public void updateProcessEntity(Process oldProcess, Process newProcess) {
-        LOG.info("Updating process entity: {}", newProcess.getName());
-        Vertex processEntityVertex = findVertex(oldProcess.getName(), RelationshipType.PROCESS_ENTITY);
-        if (processEntityVertex == null) {
-            LOG.error("Illegal State: Process entity vertex must exist for {}", oldProcess.getName());
-            throw new IllegalStateException(oldProcess.getName() + " entity vertex must exist");
-        }
-
-        updateWorkflowProperties(oldProcess.getWorkflow(), newProcess.getWorkflow(),
-                processEntityVertex, newProcess.getName());
-        updateDataClassification(oldProcess.getTags(), newProcess.getTags(), processEntityVertex);
-        updatePipelines(oldProcess.getPipelines(), newProcess.getPipelines(), processEntityVertex);
-        updateProcessClusters(oldProcess.getClusters().getClusters(),
-                newProcess.getClusters().getClusters(), processEntityVertex);
-        updateProcessInputs(oldProcess.getInputs(), newProcess.getInputs(), processEntityVertex);
-        updateProcessOutputs(oldProcess.getOutputs(), newProcess.getOutputs(), processEntityVertex);
-    }
-
-    public void addColoRelation(String colo, Vertex fromVertex) {
-        Vertex coloVertex = addVertex(colo, RelationshipType.COLO);
-        addEdge(fromVertex, coloVertex, RelationshipLabel.CLUSTER_COLO.getName());
-    }
-
-    public void addRelationToCluster(Vertex fromVertex, String clusterName, RelationshipLabel edgeLabel) {
-        Vertex clusterVertex = findVertex(clusterName, RelationshipType.CLUSTER_ENTITY);
-        if (clusterVertex == null) { // cluster must exist before adding other entities
-            LOG.error("Illegal State: Cluster entity vertex must exist for {}", clusterName);
-            throw new IllegalStateException("Cluster entity vertex must exist: " + clusterName);
-        }
-
-        addEdge(fromVertex, clusterVertex, edgeLabel.getName());
-    }
-
-    public void addRelationToDatasource(Vertex fromVertex, String datasourceName, RelationshipLabel edgeLabel) {
-        Vertex clusterVertex = findVertex(datasourceName, RelationshipType.DATASOURCE_ENTITY);
-        if (clusterVertex == null) { // cluster must exist before adding other entities
-            LOG.error("Illegal State: Datasource entity vertex must exist for {}", datasourceName);
-            throw new IllegalStateException("Datasource entity vertex must exist: " + datasourceName);
-        }
-
-        addEdge(fromVertex, clusterVertex, edgeLabel.getName());
-    }
-
-    public void addInputFeeds(Inputs inputs, Vertex processVertex) {
-        if (inputs == null) {
-            return;
-        }
-
-        for (Input input : inputs.getInputs()) {
-            addProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE);
-        }
-    }
-
-    public void addOutputFeeds(Outputs outputs, Vertex processVertex) {
-        if (outputs == null) {
-            return;
-        }
-
-        for (Output output : outputs.getOutputs()) {
-            addProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE);
-        }
-    }
-
-    public void addProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
-        Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
-        if (feedVertex == null) {
-            LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
-            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
-        }
-
-        addProcessFeedEdge(processVertex, feedVertex, edgeLabel);
-    }
-
-    public void addWorkflowProperties(Workflow workflow, Vertex processVertex, String processName) {
-        processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(),
-                ProcessHelper.getProcessWorkflowName(workflow.getName(), processName));
-        processVertex.setProperty(RelationshipProperty.VERSION.getName(), workflow.getVersion());
-        processVertex.setProperty(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(),
-                workflow.getEngine().value());
-    }
-
-    public void updateWorkflowProperties(Workflow oldWorkflow, Workflow newWorkflow,
-                                         Vertex processEntityVertex, String processName) {
-        if (areSame(oldWorkflow, newWorkflow)) {
-            return;
-        }
-
-        LOG.info("Updating workflow properties for: {}", processEntityVertex);
-        addWorkflowProperties(newWorkflow, processEntityVertex, processName);
-    }
-
-    public void updateDataClassification(String oldClassification, String newClassification,
-                                         Vertex entityVertex) {
-        if (areSame(oldClassification, newClassification)) {
-            return;
-        }
-
-        removeDataClassification(oldClassification, entityVertex);
-        addDataClassification(newClassification, entityVertex);
-    }
-
-    private void removeDataClassification(String classification, Vertex entityVertex) {
-        if (classification == null || classification.length() == 0) {
-            return;
-        }
-
-        String[] oldTags = classification.split(",");
-        for (String oldTag : oldTags) {
-            int index = oldTag.indexOf("=");
-            String tagKey = oldTag.substring(0, index);
-            String tagValue = oldTag.substring(index + 1, oldTag.length());
-
-            removeEdge(entityVertex, tagValue, tagKey);
-        }
-    }
-
-    public void updateGroups(String oldGroups, String newGroups, Vertex entityVertex) {
-        if (areSame(oldGroups, newGroups)) {
-            return;
-        }
-
-        removeGroups(oldGroups, entityVertex);
-        addGroups(newGroups, entityVertex);
-    }
-
-    public void updatePipelines(String oldPipelines, String newPipelines, Vertex entityVertex) {
-        if (areSame(oldPipelines, newPipelines)) {
-            return;
-        }
-
-        removePipelines(oldPipelines, entityVertex);
-        addPipelines(newPipelines, entityVertex);
-    }
-
-    private void removeGroups(String groups, Vertex entityVertex) {
-        removeGroupsOrPipelines(groups, entityVertex, RelationshipLabel.GROUPS);
-    }
-
-    private void removePipelines(String pipelines, Vertex entityVertex) {
-        removeGroupsOrPipelines(pipelines, entityVertex, RelationshipLabel.PIPELINES);
-    }
-
-    private void removeGroupsOrPipelines(String groupsOrPipelines, Vertex entityVertex,
-                                         RelationshipLabel edgeLabel) {
-        if (StringUtils.isEmpty(groupsOrPipelines)) {
-            return;
-        }
-
-        String[] oldGroupOrPipelinesTags = groupsOrPipelines.split(",");
-        for (String groupOrPipelineTag : oldGroupOrPipelinesTags) {
-            removeEdge(entityVertex, groupOrPipelineTag, edgeLabel.getName());
-        }
-    }
-
-    public static boolean areSame(String oldValue, String newValue) {
-        return oldValue == null && newValue == null
-                || oldValue != null && newValue != null && oldValue.equals(newValue);
-    }
-
-    public void updateFeedClusters(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters,
-                                   List<org.apache.falcon.entity.v0.feed.Cluster> newClusters,
-                                   Vertex feedEntityVertex) {
-        if (areFeedClustersSame(oldClusters, newClusters)) {
-            return;
-        }
-
-        // remove edges to old clusters
-        for (org.apache.falcon.entity.v0.feed.Cluster oldCuster : oldClusters) {
-            if (ClusterType.TARGET != oldCuster.getType()) {
-                removeEdge(feedEntityVertex, oldCuster.getName(),
-                        RelationshipLabel.FEED_CLUSTER_EDGE.getName());
-            }
-        }
-
-        // add edges to new clusters
-        for (org.apache.falcon.entity.v0.feed.Cluster newCluster : newClusters) {
-            if (ClusterType.TARGET != newCluster.getType()) {
-                addRelationToCluster(feedEntityVertex, newCluster.getName(),
-                        RelationshipLabel.FEED_CLUSTER_EDGE);
-            }
-        }
-    }
-
-    public boolean areFeedClustersSame(List<org.apache.falcon.entity.v0.feed.Cluster> oldClusters,
-                                       List<org.apache.falcon.entity.v0.feed.Cluster> newClusters) {
-        if (oldClusters.size() != newClusters.size()) {
-            return false;
-        }
-
-        List<String> oldClusterNames = getFeedClusterNames(oldClusters);
-        List<String> newClusterNames = getFeedClusterNames(newClusters);
-
-        return oldClusterNames.size() == newClusterNames.size()
-                && oldClusterNames.containsAll(newClusterNames)
-                && newClusterNames.containsAll(oldClusterNames);
-    }
-
-    public List<String> getFeedClusterNames(List<org.apache.falcon.entity.v0.feed.Cluster> clusters) {
-        List<String> clusterNames = new ArrayList<String>(clusters.size());
-        for (org.apache.falcon.entity.v0.feed.Cluster cluster : clusters) {
-            clusterNames.add(cluster.getName());
-        }
-
-        return clusterNames;
-    }
-
-    public void updateProcessClusters(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters,
-                                      List<org.apache.falcon.entity.v0.process.Cluster> newClusters,
-                                      Vertex processEntityVertex) {
-        if (areProcessClustersSame(oldClusters, newClusters)) {
-            return;
-        }
-
-        // remove old clusters
-        for (org.apache.falcon.entity.v0.process.Cluster oldCuster : oldClusters) {
-            removeEdge(processEntityVertex, oldCuster.getName(),
-                    RelationshipLabel.PROCESS_CLUSTER_EDGE.getName());
-        }
-
-        // add new clusters
-        for (org.apache.falcon.entity.v0.process.Cluster newCluster : newClusters) {
-            addRelationToCluster(processEntityVertex, newCluster.getName(),
-                    RelationshipLabel.PROCESS_CLUSTER_EDGE);
-        }
-    }
-
-    public boolean areProcessClustersSame(List<org.apache.falcon.entity.v0.process.Cluster> oldClusters,
-                                          List<org.apache.falcon.entity.v0.process.Cluster> newClusters) {
-        if (oldClusters.size() != newClusters.size()) {
-            return false;
-        }
-
-        List<String> oldClusterNames = getProcessClusterNames(oldClusters);
-        List<String> newClusterNames = getProcessClusterNames(newClusters);
-
-        return oldClusterNames.size() == newClusterNames.size()
-                && oldClusterNames.containsAll(newClusterNames)
-                && newClusterNames.containsAll(oldClusterNames);
-    }
-
-    public List<String> getProcessClusterNames(List<org.apache.falcon.entity.v0.process.Cluster> clusters) {
-        List<String> clusterNames = new ArrayList<String>(clusters.size());
-        for (org.apache.falcon.entity.v0.process.Cluster cluster : clusters) {
-            clusterNames.add(cluster.getName());
-        }
-
-        return clusterNames;
-    }
-
-    public static boolean areSame(Workflow oldWorkflow, Workflow newWorkflow) {
-        return areSame(oldWorkflow.getName(), newWorkflow.getName())
-                && areSame(oldWorkflow.getVersion(), newWorkflow.getVersion())
-                && areSame(oldWorkflow.getEngine().value(), newWorkflow.getEngine().value());
-    }
-
-    private void updateProcessInputs(Inputs oldProcessInputs, Inputs newProcessInputs,
-                                     Vertex processEntityVertex) {
-        if (areSame(oldProcessInputs, newProcessInputs)) {
-            return;
-        }
-
-        removeInputFeeds(oldProcessInputs, processEntityVertex);
-        addInputFeeds(newProcessInputs, processEntityVertex);
-    }
-
-    public static boolean areSame(Inputs oldProcessInputs, Inputs newProcessInputs) {
-        if (oldProcessInputs == null && newProcessInputs == null) {
-            return true;
-        }
-
-        if (oldProcessInputs == null || newProcessInputs == null
-                || oldProcessInputs.getInputs().size() != newProcessInputs.getInputs().size()) {
-            return false;
-        }
-
-        List<Input> oldInputs = oldProcessInputs.getInputs();
-        List<Input> newInputs = newProcessInputs.getInputs();
-
-        return oldInputs.size() == newInputs.size()
-                && oldInputs.containsAll(newInputs)
-                && newInputs.containsAll(oldInputs);
-    }
-
-    public void removeInputFeeds(Inputs inputs, Vertex processVertex) {
-        if (inputs == null) {
-            return;
-        }
-
-        for (Input input : inputs.getInputs()) {
-            removeProcessFeedEdge(processVertex, input.getFeed(), RelationshipLabel.FEED_PROCESS_EDGE);
-        }
-    }
-
-    public void removeOutputFeeds(Outputs outputs, Vertex processVertex) {
-        if (outputs == null) {
-            return;
-        }
-
-        for (Output output : outputs.getOutputs()) {
-            removeProcessFeedEdge(processVertex, output.getFeed(), RelationshipLabel.PROCESS_FEED_EDGE);
-        }
-    }
-
-    public void removeProcessFeedEdge(Vertex processVertex, String feedName, RelationshipLabel edgeLabel) {
-        Vertex feedVertex = findVertex(feedName, RelationshipType.FEED_ENTITY);
-        if (feedVertex == null) {
-            LOG.error("Illegal State: Feed entity vertex must exist for {}", feedName);
-            throw new IllegalStateException("Feed entity vertex must exist: " + feedName);
-        }
-
-        if (edgeLabel == RelationshipLabel.FEED_PROCESS_EDGE) {
-            removeEdge(feedVertex, processVertex, edgeLabel.getName());
-        } else {
-            removeEdge(processVertex, feedVertex, edgeLabel.getName());
-        }
-    }
-
-    private void updateProcessOutputs(Outputs oldProcessOutputs, Outputs newProcessOutputs,
-                                      Vertex processEntityVertex) {
-        if (areSame(oldProcessOutputs, newProcessOutputs)) {
-            return;
-        }
-
-        removeOutputFeeds(oldProcessOutputs, processEntityVertex);
-        addOutputFeeds(newProcessOutputs, processEntityVertex);
-    }
-
-    public static boolean areSame(Outputs oldProcessOutputs, Outputs newProcessOutputs) {
-        if (oldProcessOutputs == null && newProcessOutputs == null) {
-            return true;
-        }
-
-        if (oldProcessOutputs == null || newProcessOutputs == null
-                || oldProcessOutputs.getOutputs().size() != newProcessOutputs.getOutputs().size()) {
-            return false;
-        }
-
-        List<Output> oldOutputs = oldProcessOutputs.getOutputs();
-        List<Output> newOutputs = newProcessOutputs.getOutputs();
-
-        return oldOutputs.size() == newOutputs.size()
-                && oldOutputs.containsAll(newOutputs)
-                && newOutputs.containsAll(oldOutputs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java b/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
deleted file mode 100644
index 8bec02f..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/GraphUtils.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.Vertex;
-import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Utility class for graph operations.
- */
-public final class GraphUtils {
-
-    private static final Logger LOG = LoggerFactory.getLogger(GraphUtils.class);
-
-    private GraphUtils() {
-    }
-
-    public static void dumpToLog(final Graph graph) {
-        LOG.debug("Vertices of {}", graph);
-        for (Vertex vertex : graph.getVertices()) {
-            LOG.debug(vertexString(vertex));
-        }
-
-        LOG.debug("Edges of {}", graph);
-        for (Edge edge : graph.getEdges()) {
-            LOG.debug(edgeString(edge));
-        }
-    }
-
-    public static void dump(final Graph graph) throws IOException {
-        dump(graph, System.out);
-    }
-
-    public static void dump(final Graph graph, OutputStream outputStream) throws IOException {
-        GraphSONWriter.outputGraph(graph, outputStream);
-    }
-
-    public static void dump(final Graph graph, String fileName) throws IOException {
-        GraphSONWriter.outputGraph(graph, fileName);
-    }
-
-    public static String vertexString(final Vertex vertex) {
-        StringBuilder properties = new StringBuilder();
-        for (String propertyKey : vertex.getPropertyKeys()) {
-            properties.append(propertyKey)
-                    .append("=").append(vertex.getProperty(propertyKey))
-                    .append(", ");
-        }
-
-        return "v[" + vertex.getId() + "], Properties[" + properties + "]";
-    }
-
-    public static String edgeString(final Edge edge) {
-        return "e[" + edge.getLabel() + "], ["
-                + edge.getVertex(Direction.OUT).getProperty("name")
-                + " -> " + edge.getLabel() + " -> "
-                + edge.getVertex(Direction.IN).getProperty("name")
-                + "]";
-    }
-}