You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2014/12/26 06:32:11 UTC

[1/3] incubator-falcon git commit: FALCON-823 Add path matching ability to the radix tree. Contributed by Ajay Yadav

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 9827ac9e1 -> 463f85489


FALCON-823 Add path matching ability to the radix tree. Contributed by Ajay Yadav


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/06ffdf91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/06ffdf91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/06ffdf91

Branch: refs/heads/master
Commit: 06ffdf913fed07dcb8eee81f4aa0f0779b051ed0
Parents: 9827ac9
Author: srikanth.sundarrajan <sr...@apache.org>
Authored: Fri Dec 26 10:34:16 2014 +0530
Committer: srikanth.sundarrajan <sr...@apache.org>
Committed: Fri Dec 26 10:34:16 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../falcon/entity/common/FeedDataPath.java      |  15 +-
 .../falcon/entity/store/FeedPathStore.java      |   5 +
 .../apache/falcon/util/FalconRadixUtils.java    | 314 +++++++++++++++++++
 .../java/org/apache/falcon/util/RadixNode.java  |  23 ++
 .../java/org/apache/falcon/util/RadixTree.java  |  38 ++-
 .../apache/falcon/entity/FeedDataPathTest.java  | 124 ++++++++
 .../org/apache/falcon/util/RadixNodeTest.java   |  18 +-
 .../org/apache/falcon/util/RadixTreeTest.java   |  22 ++
 9 files changed, 544 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5d529b9..af4bd9e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
+   FALCON-823 Add path matching ability to the radix tree (Ajay Yadav
+   via Srikanth Sundarrajan) 
+
    FALCON-329 Falcon client methods should return objects. (Samar via Shwetha GS)
 
    FALCON-593 Preserve data type for properties in a vertex. (Ajay

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
index 39e636b..6ededbb 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
@@ -30,14 +30,25 @@ public final class FeedDataPath {
      * Standard variables for feed time components.
      */
     public static enum VARS {
-        YEAR("yyyy"), MONTH("MM"), DAY("dd"), HOUR("HH"), MINUTE("mm");
+        YEAR("yyyy", "([0-9]{4})"), MONTH("MM", "(0[1-9]|1[0-2])"), DAY("dd", "(0[1-9]|1[0-9]|2[0-9]|3[0-1])"),
+        HOUR("HH", "([0-1][0-9]|2[0-4])"), MINUTE("mm", "([0-5][0-9]|60)");
 
         private final Pattern pattern;
         private final String datePattern;
+        private final String patternRegularExpression;
 
-        private VARS(String datePattern) {
+        private VARS(String datePattern, String patternRegularExpression) {
             pattern = Pattern.compile("\\$\\{" + name() + "\\}");
             this.datePattern = datePattern;
+            this.patternRegularExpression = patternRegularExpression;
+        }
+
+        public String getPatternRegularExpression() {
+            return patternRegularExpression;
+        }
+
+        public String getDatePattern() {
+            return datePattern;
         }
 
         public String regex() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
index 97d21c1..1be12fe 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
@@ -18,6 +18,8 @@
 
 package org.apache.falcon.entity.store;
 
+import org.apache.falcon.util.FalconRadixUtils;
+
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.Collection;
@@ -34,6 +36,9 @@ public interface FeedPathStore<T> {
     int getSize();
 
     @Nullable
+    Collection<T> find(@Nonnull String key, @Nonnull FalconRadixUtils.INodeAlgorithm algorithm);
+
+    @Nullable
     Collection<T> find(@Nonnull String key);
 
     boolean delete(@Nonnull String key, @Nonnull T value);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
new file mode 100644
index 0000000..bbd73c7
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.entity.common.FeedDataPath;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Falcon specific utilities for the Radix Tree.
+ */
+public class FalconRadixUtils {
+
+    /**
+     * This interface implements the various algorithms to compare node's key with input based on whether you want
+     * a regular expression based algorithm or a character by character matching algorithm.
+     */
+    public interface INodeAlgorithm {
+
+        /**
+         * Checks if the given key and input match.
+         * @param key key of the node
+         * @param input input String to be matched against key.
+         * @return true if key and input match.
+         */
+        boolean match(String key, String input);
+
+        boolean startsWith(String key, String input);
+
+        /**
+         * Finds next node to take for traversal among currentNode's children.
+         * @param currentNode of RadixTree which has been matched.
+         * @param input input String to be searched.
+         * @return Node to be traversed next.
+         */
+        RadixNode getNextCandidate(RadixNode currentNode, String input);
+
+        // for the given node and input key, finds the remainingText to be matched with child sub tree.
+        String getRemainingText(RadixNode currentNode, String key);
+    }
+
+    /**
+     * This Algorithm does a plain string comparison for all
+     * type of operations on a node.
+     */
+    static class StringAlgorithm implements INodeAlgorithm {
+
+        @Override
+        public boolean match(String key, String input) {
+            return StringUtils.equals(key, input);
+        }
+
+        @Override
+        public boolean startsWith(String nodeKey, String inputKey) {
+            return inputKey.startsWith(nodeKey);
+        }
+
+        @Override
+        public RadixNode getNextCandidate(RadixNode currentNode, String input) {
+            RadixNode newRoot = null;
+            String remainingText = input.substring(currentNode.getKey().length());
+            List<RadixNode> result = currentNode.getChildren();
+            for(RadixNode child : result){
+                if (child.getKey().charAt(0) == remainingText.charAt(0)){
+                    newRoot = child;
+                    break;
+                }
+            }
+            return newRoot;
+        }
+
+        @Override
+        public String getRemainingText(RadixNode currentNode, String key) {
+            return key.substring(currentNode.getKey().length());
+        }
+
+
+    }
+
+    static class FeedRegexAlgorithm implements INodeAlgorithm {
+
+        /**
+         * This function matches a feed path template with feed instance's path string.
+         *
+         * Key is assumed to be a feed's path template and inputString is assumed to be an instance's path string.
+         * Variable/Regex parts of the feed's template are matched against the corresponding parts in inputString
+         * using regular expression and for other parts a character by character match is performed.
+         * e.g. Given templateString (/data/cas/${YEAR}/${MONTH}/${DAY}) and inputString (/data/cas/2014/09/09)
+         * the function will return true.
+         * @param templateString Node's key (Feed's template path)
+         * @param inputString inputString String to be matched against templateString(instance's path)
+         * @return true if the templateString and inputString match, false otherwise.
+         */
+        @Override
+        public boolean match(String templateString, String inputString) {
+            if (StringUtils.isBlank(templateString)) {
+                return false;
+            }
+            // Divide the templateString and inputString into templateParts of regex and character matches
+            List<String> templateParts = getPartsInPathTemplate(templateString);
+            List<String> inputStringParts = getCorrespondingParts(inputString, templateParts);
+
+            if (inputStringParts.size() != templateParts.size()) {
+                return false;
+            }
+
+            int counter = 0;
+            while (counter < inputStringParts.size()) {
+                if (!matchPart(templateParts.get(counter), inputStringParts.get(counter))) {
+                    return false;
+                }
+                counter++;
+            }
+            return true;
+        }
+
+
+        /**
+         *
+         * Finds if the current node's key is a prefix of the given inputString or not.
+         *
+         * @param inputTemplate inputTemplate String
+         * @param inputString inputString to be checked
+         * @return true if inputString starts with inputTemplate, false otherwise.
+         */
+        @Override
+        public boolean startsWith(String inputTemplate, String inputString) {
+
+            if (StringUtils.isBlank(inputString)) {
+                return false;
+            }
+            if (StringUtils.isBlank(inputTemplate)) {
+                return true;
+            }
+
+            // divide inputTemplate and inputString into corresponding templateParts of regex and character only strings
+            List<String> templateParts = getPartsInPathTemplate(inputTemplate);
+            List<String> remainingPattern = getCorrespondingParts(inputString, templateParts);
+
+            if (templateParts.size() > remainingPattern.size()) {
+                return false;
+            }
+
+            int counter = 0;
+            // compare part by part till the templateParts end
+            for (String templatePart : templateParts) {
+                String part = remainingPattern.get(counter);
+                if (!matchPart(templatePart, part)) {
+                    return false;
+                }
+                counter++;
+            }
+            return true;
+        }
+
+        @Override
+        public RadixNode getNextCandidate(RadixNode currentNode, String input) {
+            RadixNode newRoot = null;
+            // replace the regex with pattern's length
+            String remainingText = input.substring(getPatternsEffectiveLength(currentNode.getKey()));
+            List<RadixNode> result = currentNode.getChildren();
+            for(RadixNode child : result) {
+                String key = child.getKey();
+                if (key.startsWith("${")) {
+                    // get the regex
+                    String regex = key.substring(0, key.indexOf("}") + 1);
+                    // match the text and the regex
+                    FeedDataPath.VARS var = getMatchingRegex(regex);
+                    if (matchPart(regex, input.substring(0, var.getDatePattern().length()))) {
+                        newRoot = child; // if it matches then this is the newRoot
+                        break;
+                    }
+                } else if (child.getKey().charAt(0) == remainingText.charAt(0)) {
+                    newRoot = child;
+                    break;
+                }
+            }
+            return newRoot;
+        }
+
+        @Override
+        public String getRemainingText(RadixNode currentNode, String inputString) {
+            // find the match length for current inputString
+            return inputString.substring(getPatternsEffectiveLength(currentNode.getKey()));
+        }
+
+        private int getPatternsEffectiveLength(String templateString) {
+            if (StringUtils.isBlank(templateString)) {
+                return 0;
+            }
+            for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
+                templateString = templateString.replace("${" + var.name() + "}", var.getDatePattern());
+            }
+            return templateString.length();
+        }
+
+        /**
+         * Divide a given template string into parts of regex and character strings
+         * e.g. /data/cas/${YEAR}/${MONTH}/${DAY} will be converted to
+         * [/data/cas/, ${YEAR}, /, ${MONTH}, /, ${DAY}]
+         * @param templateString input string representing a feed's path template
+         * @return list of parts in input templateString which are either completely regex or normal string.
+         */
+        private List<String> getPartsInPathTemplate(String templateString) {
+            //divide the node's templateString in parts of regular expression and normal string
+            List<String> parts = new ArrayList<String>();
+            Matcher matcher = FeedDataPath.PATTERN.matcher(templateString);
+            int currentIndex = 0;
+            while (matcher.find()) {
+                parts.add(templateString.substring(currentIndex, matcher.start()));
+                parts.add(matcher.group());
+                currentIndex = matcher.end();
+            }
+            if (currentIndex != templateString.length()) {
+                parts.add(templateString.substring(currentIndex));
+            }
+            return Collections.unmodifiableList(parts);
+        }
+
+
+        private FeedDataPath.VARS getMatchingRegex(String inputPart) {
+            //inputPart will be something like ${YEAR}
+            inputPart = inputPart.replace("${", "\\$\\{");
+            inputPart = inputPart.replace("}", "\\}");
+
+            for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
+                if (StringUtils.equals(inputPart, var.regex())) {
+                    return var;
+                }
+            }
+            return null;
+        }
+
+
+        /**
+         * Divides a string into corresponding parts for the template to carry out comparison.
+         * templateParts = [/data/cas/, ${YEAR}, /, ${MONTH}, /, ${DAY}]
+         * inputString = /data/cas/2014/09/09
+         * returns [/data/cas/, 2014, /, 09, /, 09]
+         * @param inputString normal string representing feed instance path
+         * @param templateParts parts of feed's path template broken into regex and non-regex units.
+         * @return a list of strings where each part of the list corresponds to a part in list of template parts.
+         */
+        private List<String> getCorrespondingParts(String inputString, List<String> templateParts) {
+            List<String> stringParts = new ArrayList<String>();
+            int counter = 0;
+            while (StringUtils.isNotBlank(inputString) && counter < templateParts.size()) {
+                String currentTemplatePart = templateParts.get(counter);
+                int length = Math.min(getPatternsEffectiveLength(currentTemplatePart), inputString.length());
+                stringParts.add(inputString.substring(0, length));
+                inputString = inputString.substring(length);
+                counter++;
+            }
+            if (StringUtils.isNotBlank(inputString)) {
+                stringParts.add(inputString);
+            }
+            return stringParts;
+        }
+
+        /**
+         * Compare a pure regex or pure string part with a given string.
+         *
+         * @param template template part, which can either be a pure regex or pure non-regex string.
+         * @param input input String to be matched against the template part.
+         * @return true if the input string matches the template, in case of a regex component a regex comparison is
+         * made, else a character by character comparison is made.
+         */
+        private boolean matchPart(String template, String input) {
+            if (template.startsWith("${")) { // if the part begins with ${ then it's a regex part, do regex match
+                template = template.replace("${", "\\$\\{");
+                template = template.replace("}", "\\}");
+                for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {//find which regex is this
+                    if (StringUtils.equals(var.regex(), template)) {// regex found, do matching
+                        //find part of the input string which should be matched against regex
+                        String desiredPart = input.substring(0, var.getDatePattern().length());
+                        Pattern pattern = Pattern.compile(var.getPatternRegularExpression());
+                        Matcher matcher = pattern.matcher(desiredPart);
+                        if (!matcher.matches()) {
+                            return false;
+                        }
+                        return true;
+                    }
+                }
+                return false;
+            } else {// do exact match with normal strings
+                if (!input.startsWith(template)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/util/RadixNode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/RadixNode.java b/common/src/main/java/org/apache/falcon/util/RadixNode.java
index 564df8e..12227cb 100644
--- a/common/src/main/java/org/apache/falcon/util/RadixNode.java
+++ b/common/src/main/java/org/apache/falcon/util/RadixNode.java
@@ -124,4 +124,27 @@ public class RadixNode<T> {
 
         return matchLength;
     }
+
+
+    /**
+     * Finds the length of the match between node's key and input.
+     *
+     * It can do either a character by character match or a regular expression match(used to match a feed instance path
+     * with feed location template). Only regular expressions allowed in the feed path are evaluated for matching.
+     * @param input input string to be matched with the key of the node.
+     * @param matcher A custom matcher algorithm to match node's key against the input. It is used when matching
+     *                path of a Feed's instance to Feed's path template.
+     * @return
+     */
+    public boolean matches(String input, FalconRadixUtils.INodeAlgorithm matcher) {
+        if (input == null) {
+            return false;
+        }
+
+        if (matcher == null) {
+            return StringUtils.equals(getKey(), input);
+        }
+
+        return matcher.match(this.getKey(), input);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/util/RadixTree.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/RadixTree.java b/common/src/main/java/org/apache/falcon/util/RadixTree.java
index 6dbe160..6cd79f5 100644
--- a/common/src/main/java/org/apache/falcon/util/RadixTree.java
+++ b/common/src/main/java/org/apache/falcon/util/RadixTree.java
@@ -189,21 +189,35 @@ public class RadixTree<T> implements FeedPathStore<T>, Formattable {
      */
     @Override
     @Nullable
-    public synchronized Collection<T> find(@Nonnull String key) {
-        if (key != null && !key.trim().isEmpty()){
-            return recursiveFind(key.trim(), root);
+    public synchronized Collection<T> find(@Nonnull String key, FalconRadixUtils.INodeAlgorithm algorithm) {
+        if (key != null && !key.trim().isEmpty()) {
+            if (algorithm == null) {
+                algorithm = new FalconRadixUtils.StringAlgorithm();
+            }
+            return recursiveFind(key.trim(), root, algorithm);
+        }
+        return null;
+    }
+
+    @Nullable
+    @Override
+    public Collection<T> find(@Nonnull String key) {
+        if (key != null && !key.trim().isEmpty()) {
+            FalconRadixUtils.INodeAlgorithm algorithm = new FalconRadixUtils.StringAlgorithm();
+            return recursiveFind(key.trim(), root, algorithm);
         }
         return null;
     }
 
-    private Collection<T> recursiveFind(String key, RadixNode<T> currentNode){
+    private Collection<T> recursiveFind(String key, RadixNode<T> currentNode,
+        FalconRadixUtils.INodeAlgorithm algorithm){
 
-        if (!key.startsWith(currentNode.getKey())){
+        if (!algorithm.startsWith(currentNode.getKey(), key)){
             LOG.debug("Current Node key: {} is not a prefix in the input key: {}", currentNode.getKey(), key);
             return null;
         }
 
-        if (StringUtils.equals(key, currentNode.getKey())){
+        if (algorithm.match(currentNode.getKey(), key)){
             if (currentNode.isTerminal()){
                 LOG.debug("Found the terminal node with key: {} for the given input.", currentNode.getKey());
                 return currentNode.getValues();
@@ -214,21 +228,15 @@ public class RadixTree<T> implements FeedPathStore<T>, Formattable {
         }
 
         //find child to follow, using remaining Text
-        RadixNode<T> newRoot = null;
-        String remainingText = key.substring(currentNode.getKey().length());
-        for(RadixNode<T> child : currentNode.getChildren()){
-            if (child.getKey().charAt(0) == remainingText.charAt(0)){
-                newRoot = child;
-                break;
-            }
-        }
+        RadixNode<T> newRoot = algorithm.getNextCandidate(currentNode, key);
+        String remainingText = algorithm.getRemainingText(currentNode, key);
 
         if (newRoot == null){
             LOG.debug("No child found to follow for further processing. Current node key {}");
             return null;
         }else {
             LOG.debug("Recursing with new key: {} and new remainingText: {}", newRoot.getKey(), remainingText);
-            return recursiveFind(remainingText, newRoot);
+            return recursiveFind(remainingText, newRoot, algorithm);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
new file mode 100644
index 0000000..c405556
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.common.FeedDataPath;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ *
+ */
+public class FeedDataPathTest {
+
+    @Test
+    public void testMinutesRegularExpression() {
+        String monthPattern = FeedDataPath.VARS.MINUTE.getPatternRegularExpression();
+        Assert.assertFalse("0".matches(monthPattern));
+        Assert.assertFalse("1".matches(monthPattern));
+        Assert.assertFalse("61".matches(monthPattern));
+        Assert.assertFalse("010".matches(monthPattern));
+        Assert.assertFalse("10 ".matches(monthPattern));
+        Assert.assertFalse(" 10".matches(monthPattern));
+
+
+        Assert.assertTrue("00".matches(monthPattern));
+        Assert.assertTrue("01".matches(monthPattern));
+        Assert.assertTrue("60".matches(monthPattern));
+    }
+
+    @Test
+    public void testHourRegularExpression() {
+        String hourPattern = FeedDataPath.VARS.HOUR.getPatternRegularExpression();
+        Assert.assertFalse("0".matches(hourPattern));
+        Assert.assertFalse("1".matches(hourPattern));
+        Assert.assertFalse("2".matches(hourPattern));
+        Assert.assertFalse("25".matches(hourPattern));
+        Assert.assertFalse("29".matches(hourPattern));
+        Assert.assertFalse("010".matches(hourPattern));
+        Assert.assertFalse("10 ".matches(hourPattern));
+        Assert.assertFalse(" 10".matches(hourPattern));
+
+
+        Assert.assertTrue("00".matches(hourPattern));
+        Assert.assertTrue("01".matches(hourPattern));
+        Assert.assertTrue("24".matches(hourPattern));
+        Assert.assertTrue("10".matches(hourPattern));
+        Assert.assertTrue("19".matches(hourPattern));
+        Assert.assertTrue("12".matches(hourPattern));
+    }
+
+
+    @Test
+    public void testDayRegularExpression() {
+        String dayPattern = FeedDataPath.VARS.DAY.getPatternRegularExpression();
+        Assert.assertFalse("0".matches(dayPattern));
+        Assert.assertFalse("1".matches(dayPattern));
+        Assert.assertFalse("32".matches(dayPattern));
+        Assert.assertFalse("00".matches(dayPattern));
+        Assert.assertFalse("010".matches(dayPattern));
+        Assert.assertFalse("10 ".matches(dayPattern));
+        Assert.assertFalse(" 10".matches(dayPattern));
+
+
+        Assert.assertTrue("01".matches(dayPattern));
+        Assert.assertTrue("10".matches(dayPattern));
+        Assert.assertTrue("29".matches(dayPattern));
+        Assert.assertTrue("30".matches(dayPattern));
+        Assert.assertTrue("31".matches(dayPattern));
+    }
+
+    @Test
+    public void testMonthRegularExpression() {
+        String monthPattern = FeedDataPath.VARS.MONTH.getPatternRegularExpression();
+        Assert.assertFalse("0".matches(monthPattern));
+        Assert.assertFalse("1".matches(monthPattern));
+        Assert.assertFalse("13".matches(monthPattern));
+        Assert.assertFalse("19".matches(monthPattern));
+        Assert.assertFalse("00".matches(monthPattern));
+        Assert.assertFalse("010".matches(monthPattern));
+        Assert.assertFalse("10 ".matches(monthPattern));
+        Assert.assertFalse(" 10".matches(monthPattern));
+
+
+        Assert.assertTrue("01".matches(monthPattern));
+        Assert.assertTrue("02".matches(monthPattern));
+        Assert.assertTrue("10".matches(monthPattern));
+        Assert.assertTrue("12".matches(monthPattern));
+    }
+
+    @Test
+    public void testYearRegularExpression() {
+        String monthPattern = FeedDataPath.VARS.YEAR.getPatternRegularExpression();
+        Assert.assertFalse("0".matches(monthPattern));
+        Assert.assertFalse("1".matches(monthPattern));
+        Assert.assertFalse("13".matches(monthPattern));
+        Assert.assertFalse("19".matches(monthPattern));
+        Assert.assertFalse("00".matches(monthPattern));
+        Assert.assertFalse("010".matches(monthPattern));
+        Assert.assertFalse("10 ".matches(monthPattern));
+        Assert.assertFalse(" 10".matches(monthPattern));
+
+
+        Assert.assertTrue("0001".matches(monthPattern));
+        Assert.assertTrue("2014".matches(monthPattern));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java b/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
index 4f63806..aea28e6 100644
--- a/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
+++ b/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
@@ -19,7 +19,8 @@
 package org.apache.falcon.util;
 
 import org.testng.Assert;
-import org.testng.annotations.*;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -90,4 +91,19 @@ public class RadixNodeTest {
         Assert.assertTrue(normalNode.containsValue("CAS Project"));
     }
 
+    @Test
+    public void testMatchInput() {
+        RadixNode<String> node = new RadixNode<String>();
+
+        FalconRadixUtils.INodeAlgorithm matcher = new FalconRadixUtils.FeedRegexAlgorithm();
+        node.setKey("/data/cas/projects/${YEAR}/${MONTH}/${DAY}");
+        Assert.assertTrue(node.matches("/data/cas/projects/2014/09/09", matcher));
+        Assert.assertFalse(node.matches("/data/cas/projects/20140909", matcher));
+        Assert.assertFalse(node.matches("/data/2014/projects/2014/09/09", matcher));
+        Assert.assertFalse(node.matches("/data/2014/projects/2014/09/", matcher));
+        Assert.assertFalse(node.matches("/data/cas/projects/2014/09/09trail", matcher));
+        Assert.assertFalse(node.matches("/data/cas/projects/2014/09/09/", matcher));
+        Assert.assertFalse(node.matches("/data/cas/projects/2014/09/", matcher));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
index 28589ed..109c24d 100644
--- a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
+++ b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
@@ -34,6 +34,7 @@ import java.util.List;
 public class RadixTreeTest {
 
     private RadixTree<String> tree;
+    private FalconRadixUtils.INodeAlgorithm regexAlgorithm = new FalconRadixUtils.FeedRegexAlgorithm();
 
     @BeforeMethod
     public void setUp() {
@@ -118,6 +119,27 @@ public class RadixTreeTest {
 
     }
 
+    //Tests for find using regular expression
+    @Test
+    public void testFindUsingRegex() {
+        tree.insert("/data/cas/${YEAR}/", "rtbd");
+        Assert.assertTrue(tree.find("/data/cas/2014/", regexAlgorithm).contains("rtbd"));
+        Assert.assertNull(tree.find("/data/cas/", regexAlgorithm));
+        Assert.assertNull(tree.find("/data/cas/2014/09", regexAlgorithm));
+        Assert.assertNull(tree.find("/data/cas/${YEAR}/", regexAlgorithm));
+
+        tree.insert("/data/cas/${YEAR}/colo", "local");
+        tree.insert("/data/cas/${YEAR}/colo", "duplicate-local");
+        Assert.assertNull(tree.find("/data/cas/${YEAR}/", regexAlgorithm));
+        Assert.assertNull(tree.find("/data/cas/${YEAR}/colo", regexAlgorithm));
+        Assert.assertNull(tree.find("/data/cas/", regexAlgorithm));
+        Assert.assertTrue(tree.find("/data/cas/2014/", regexAlgorithm).contains("rtbd"));
+        Assert.assertTrue(tree.find("/data/cas/2014/colo", regexAlgorithm).contains("local"));
+        Assert.assertTrue(tree.find("/data/cas/2014/colo", regexAlgorithm).contains("duplicate-local"));
+
+
+    }
+
     // Tests for delete method
     @Test
     public void testDeleteChildOfTerminal() {


[3/3] incubator-falcon git commit: FALCON-914 Add option to search for Entities. Contributed by Ajay Yadav

Posted by sr...@apache.org.
FALCON-914 Add option to search for Entities. Contributed by Ajay Yadav


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/463f8548
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/463f8548
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/463f8548

Branch: refs/heads/master
Commit: 463f854890d14c0d6b64f00c89827347f58ede52
Parents: 45a7b98
Author: srikanth.sundarrajan <sr...@apache.org>
Authored: Fri Dec 26 10:49:48 2014 +0530
Committer: srikanth.sundarrajan <sr...@apache.org>
Committed: Fri Dec 26 10:49:48 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../java/org/apache/falcon/cli/FalconCLI.java   |  7 ++-
 .../org/apache/falcon/client/FalconClient.java  | 22 ++++----
 docs/src/site/twiki/restapi/EntityList.twiki    |  3 ++
 .../falcon/resource/AbstractEntityManager.java  | 48 +++++++++++++++---
 .../AbstractSchedulableEntityManager.java       |  2 +-
 .../proxy/SchedulableEntityManagerProxy.java    |  5 +-
 .../falcon/resource/EntityManagerTest.java      | 53 ++++++++++++++++----
 .../resource/SchedulableEntityManager.java      |  7 +--
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  5 ++
 10 files changed, 124 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5575219..e8e82d2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
+   FALCON-914 Add option to search for Entities. (Ajay Yadav via Srikanth
+   Sundarrajan) 
+
    FALCON-256 Create new API for Process dependency graph DAG which captures 
    process connected via feeds. (Ajay Yadav via Srikanth Sundarrajan)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 5797bbe..ca514c1 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -93,6 +93,7 @@ public class FalconCLI {
     public static final String OFFSET_OPT = "offset";
     public static final String NUM_RESULTS_OPT = "numResults";
     public static final String NUM_INSTANCES_OPT = "numInstances";
+    public static final String PATTERN_OPT = "pattern";
 
     public static final String INSTANCE_CMD = "instance";
     public static final String START_OPT = "start";
@@ -374,6 +375,7 @@ public class FalconCLI {
         String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
         String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
         String filterTags = commandLine.getOptionValue(TAGS_OPT);
+        String searchPattern = commandLine.getOptionValue(PATTERN_OPT);
         String fields = commandLine.getOptionValue(FIELDS_OPT);
         Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
         Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
@@ -437,7 +439,7 @@ public class FalconCLI {
             validateOrderBy(orderBy, entityAction);
             validateFilterBy(filterBy, entityAction);
             EntityList entityList = client.getEntityList(entityType, fields, filterBy,
-                    filterTags, orderBy, sortOrder, offset, numResults);
+                    filterTags, orderBy, sortOrder, offset, numResults, searchPattern);
             result = entityList != null ? entityList.toString() : "No entity of type (" + entityType + ") found.";
         }  else if (optionsList.contains(SUMMARY_OPT)) {
             validateEntityTypeForSummary(entityType);
@@ -646,6 +648,8 @@ public class FalconCLI {
         Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
         Option filterBy = new Option(FILTER_BY_OPT, true,
                 "Filter returned entities by the specified status");
+        Option searchPattern = new Option(PATTERN_OPT, true,
+                "Filter entities by fuzzy matching with specified pattern");
         Option filterTags = new Option(TAGS_OPT, true, "Filter returned entities by the specified tags");
         Option orderBy = new Option(ORDER_BY_OPT, true,
                 "Order returned entities by this field");
@@ -668,6 +672,7 @@ public class FalconCLI {
         entityOptions.addOption(end);
         entityOptions.addOption(fields);
         entityOptions.addOption(filterBy);
+        entityOptions.addOption(searchPattern);
         entityOptions.addOption(filterTags);
         entityOptions.addOption(orderBy);
         entityOptions.addOption(sortOrder);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index a748c58..7f1bc27 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -366,10 +366,10 @@ public class FalconClient {
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
 
     public EntityList getEntityList(String entityType, String fields, String filterBy, String filterTags,
-                                    String orderBy, String sortOrder,
-                                    Integer offset, Integer numResults) throws FalconCLIException {
+                                    String orderBy, String sortOrder, Integer offset,
+                                    Integer numResults, String searchPattern) throws FalconCLIException {
         return sendListRequest(Entities.LIST, entityType, fields, filterBy,
-                filterTags, orderBy, sortOrder, offset, numResults);
+                filterTags, orderBy, sortOrder, offset, numResults, searchPattern);
     }
 
     public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end,
@@ -589,8 +589,8 @@ public class FalconClient {
     private WebResource addParamsToResource(WebResource resource,
                                             String start, String end, String runId, String colo,
                                             String fields, String filterBy, String tags,
-                                            String orderBy, String sortOrder,
-                                            Integer offset, Integer numResults, Integer numInstances) {
+                                            String orderBy, String sortOrder, Integer offset,
+                                            Integer numResults, Integer numInstances, String searchPattern) {
 
         if (!StringUtils.isEmpty(fields)) {
             resource = resource.queryParam("fields", fields);
@@ -628,6 +628,10 @@ public class FalconClient {
         if (numInstances != null) {
             resource = resource.queryParam("numInstances", numInstances.toString());
         }
+
+        if (!StringUtils.isEmpty(searchPattern)) {
+            resource = resource.queryParam("pattern", searchPattern);
+        }
         return resource;
 
     }
@@ -645,7 +649,7 @@ public class FalconClient {
         resource = addParamsToResource(resource, start, end, null, null,
                 fields, filterBy, filterTags,
                 orderBy, sortOrder,
-                offset, numResults, numInstances);
+                offset, numResults, numInstances, null);
 
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
@@ -725,7 +729,7 @@ public class FalconClient {
                 .path(entity);
 
         resource = addParamsToResource(resource, start, end, runid, colo,
-                null, filterBy, null, orderBy, sortOrder, offset, numResults, null);
+                null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null);
 
         if (lifeCycles != null) {
             checkLifeCycleOption(lifeCycles, type);
@@ -778,11 +782,11 @@ public class FalconClient {
     //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
     private EntityList sendListRequest(Entities entities, String entityType, String fields, String filterBy,
                                        String filterTags, String orderBy, String sortOrder, Integer offset,
-                                       Integer numResults) throws FalconCLIException {
+                                       Integer numResults, String searchPattern) throws FalconCLIException {
         WebResource resource = service.path(entities.path)
                 .path(entityType);
         resource = addParamsToResource(resource, null, null, null, null, fields, filterBy, filterTags,
-                orderBy, sortOrder, offset, numResults, null);
+                orderBy, sortOrder, offset, numResults, null, searchPattern);
 
         ClientResponse clientResponse = resource
                 .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/docs/src/site/twiki/restapi/EntityList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityList.twiki b/docs/src/site/twiki/restapi/EntityList.twiki
index b569ade..5e11691 100644
--- a/docs/src/site/twiki/restapi/EntityList.twiki
+++ b/docs/src/site/twiki/restapi/EntityList.twiki
@@ -11,6 +11,9 @@ Get list of the entities.
    * :entity-type Valid options are cluster, feed or process.
    * fields <optional param> Fields of entity that the user wants to view, separated by commas.
       * Valid options are STATUS, TAGS, PIPELINES.
+   * pattern <optional param> Find string which contains this sequence of characters. Example: pattern=abc
+     * matching is case insensitive.
+     * For example a pattern mhs will match a process named New-My-Hourly-Summary.
    * filterBy <optional param> Filter results by list of field:value pairs. Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs
       * Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER.
       * Query will do an AND among filterBy fields.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 8ec9e2d..4a686e7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -32,7 +32,10 @@ import org.apache.falcon.entity.parser.EntityParserFactory;
 import org.apache.falcon.entity.parser.ValidationException;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.store.EntityAlreadyExistsException;
-import org.apache.falcon.entity.v0.*;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityGraph;
+import org.apache.falcon.entity.v0.EntityIntegrityChecker;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.resource.APIResult.Status;
 import org.apache.falcon.resource.EntityList.EntityElement;
@@ -51,7 +54,18 @@ import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  * A base class for managing Entity operations.
@@ -524,13 +538,15 @@ public abstract class AbstractEntityManager {
      * @return EntityList
      */
     public EntityList getEntityList(String type, String fieldStr, String filterBy, String filterTags,
-                                    String orderBy, String sortOrder, Integer offset, Integer resultsPerPage) {
+                                    String orderBy, String sortOrder, Integer offset, Integer resultsPerPage,
+                                    String pattern) {
 
         HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toLowerCase().split(",")));
         validateEntityFilterByClause(filterBy);
         List<Entity> entities;
         try {
-            entities = getEntities(type, "", "", "", filterBy, filterTags, orderBy, sortOrder, offset, resultsPerPage);
+            entities = getEntities(type, "", "", "", filterBy, filterTags, orderBy, sortOrder, offset,
+                    resultsPerPage, pattern);
         } catch (Exception e) {
             LOG.error("Failed to get entity list", e);
             throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
@@ -554,8 +570,8 @@ public abstract class AbstractEntityManager {
     }
 
     protected List<Entity> getEntities(String type, String startDate, String endDate, String cluster,
-                                       String filterBy, String filterTags, String orderBy, String sortOrder,
-                                       int offset, int resultsPerPage) throws FalconException, IOException {
+                                       String filterBy, String filterTags, String orderBy, String sortOrder, int offset,
+                                       int resultsPerPage, String pattern) throws FalconException, IOException {
         final Map<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
         final List<String> filterByTags = getFilterByTags(filterTags);
 
@@ -593,6 +609,10 @@ public abstract class AbstractEntityManager {
                     filterByFieldsValues, filterByTags, tags, pipelines)) {
                 continue;
             }
+
+            if (StringUtils.isNotBlank(pattern) && !fuzzySearch(entity.getName(), pattern)) {
+                continue;
+            }
             entities.add(entity);
         }
         // Sort entities before returning a subset of entity elements.
@@ -607,6 +627,22 @@ public abstract class AbstractEntityManager {
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
+    boolean fuzzySearch(String enityName, String pattern) {
+        int currentIndex = 0; // current index in pattern which is to be matched
+        char[] searchPattern = pattern.toLowerCase().toCharArray();
+        String name = enityName.toLowerCase();
+
+        for (Character c : name.toCharArray()) {
+            if (currentIndex < searchPattern.length && c == searchPattern[currentIndex]) {
+                currentIndex++;
+            }
+            if (currentIndex == searchPattern.length) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private boolean filterEntityByDatesAndCluster(Entity entity, String startDate, String endDate, String cluster)
         throws FalconException {
         if (StringUtils.isEmpty(cluster)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index d994e25..a4d1f8b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -185,7 +185,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
             entities = getEntities(type,
                     SchemaHelper.getDateFormat().format(startAndEndDates.first),
                     SchemaHelper.getDateFormat().format(startAndEndDates.second),
-                    cluster, filterBy, filterTags, orderBy, sortOrder, offset, resultsPerPage);
+                    cluster, filterBy, filterTags, orderBy, sortOrder, offset, resultsPerPage, "");
             colo = ((Cluster) configStore.get(EntityType.CLUSTER, cluster)).getColo();
         } catch (Exception e) {
             LOG.error("Failed to get entities", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 075cb64..cfa70a0 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -399,8 +399,9 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
                                     @DefaultValue("asc") @QueryParam("sortOrder") String sortOrder,
                                     @DefaultValue("0") @QueryParam("offset") Integer offset,
                                     @DefaultValue(DEFAULT_NUM_RESULTS)
-                                    @QueryParam("numResults") Integer resultsPerPage) {
-        return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage);
+                                    @QueryParam("numResults") Integer resultsPerPage,
+                                    @QueryParam("pattern") String pattern) {
+        return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage, pattern);
     }
 
     @GET

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index 1862e39..470f866 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.resource;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
@@ -125,7 +126,7 @@ public class EntityManagerTest extends AbstractEntityManager {
          * Only one entity should be returned when the auth is enabled.
          */
         try {
-            getEntityList("process", "", "", "", "", "", 0, 10);
+            getEntityList("process", "", "", "", "", "", 0, 10, "");
             Assert.fail();
         } catch (Throwable ignore) {
             // do nothing
@@ -142,7 +143,7 @@ public class EntityManagerTest extends AbstractEntityManager {
         Entity process2 = buildProcess("processAuthUser", System.getProperty("user.name"), "", "");
         configStore.publish(EntityType.PROCESS, process2);
 
-        EntityList entityList = this.getEntityList("process", "", "", "", "", "asc", 0, 10);
+        EntityList entityList = this.getEntityList("process", "", "", "", "", "asc", 0, 10, "");
         Assert.assertNotNull(entityList.getElements());
         Assert.assertEquals(entityList.getElements().length, 1);
 
@@ -151,7 +152,7 @@ public class EntityManagerTest extends AbstractEntityManager {
          */
         StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
         CurrentUser.authenticate(System.getProperty("user.name"));
-        entityList = this.getEntityList("process", "", "", "", "", "desc", 0, 10);
+        entityList = this.getEntityList("process", "", "", "", "", "desc", 0, 10, "");
         Assert.assertNotNull(entityList.getElements());
         Assert.assertEquals(entityList.getElements().length, 1);
 
@@ -188,7 +189,7 @@ public class EntityManagerTest extends AbstractEntityManager {
         configStore.publish(EntityType.PROCESS, process4);
 
         EntityList entityList = this.getEntityList("process", "tags", "PIPELINES:dataReplicationPipeline",
-                "", "name", "desc", 1, 1);
+                "", "name", "desc", 1, 1, "");
         Assert.assertNotNull(entityList.getElements());
         Assert.assertEquals(entityList.getElements().length, 1);
         Assert.assertEquals(entityList.getElements()[0].name, "process1");
@@ -198,7 +199,7 @@ public class EntityManagerTest extends AbstractEntityManager {
 
 
         entityList = this.getEntityList("process", "pipelines", "",
-                "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 0, 2);
+                "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 0, 2, "");
         Assert.assertNotNull(entityList.getElements());
         Assert.assertEquals(entityList.getElements().length, 2);
         Assert.assertEquals(entityList.getElements()[1].name, "process2");
@@ -207,17 +208,17 @@ public class EntityManagerTest extends AbstractEntityManager {
         Assert.assertEquals(entityList.getElements()[0].tag, null);
 
         entityList = this.getEntityList("process", "pipelines", "",
-                "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, 2);
+                "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, 2, "");
         Assert.assertEquals(entityList.getElements().length, 0);
 
         entityList = this.getEntityList("process", "pipelines", "",
-                "owner=producer@xyz.com", "name", "", 1, 2);
+                "owner=producer@xyz.com", "name", "", 1, 2, "");
         Assert.assertEquals(entityList.getElements().length, 2);
 
         // Test negative value for numResults, should throw an exception.
         try {
             this.getEntityList("process", "pipelines", "",
-                    "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, -1);
+                    "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, -1, "");
             Assert.assertTrue(false);
         } catch (Throwable e) {
             Assert.assertTrue(true);
@@ -226,13 +227,47 @@ public class EntityManagerTest extends AbstractEntityManager {
         // Test invalid entry for sortOrder
         try {
             this.getEntityList("process", "pipelines", "",
-                    "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "invalid", 10, 2);
+                    "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "invalid", 10, 2, "");
             Assert.assertTrue(false);
         } catch (Throwable e) {
             Assert.assertTrue(true);
         }
     }
 
+    @Test
+    public void testSearch() throws FalconException {
+        Assert.assertTrue(fuzzySearch("My-Hourly-Summary", "mhs"));
+        Assert.assertTrue(fuzzySearch("New-My-Hourly-Summary", "MHs"));
+        Assert.assertFalse(fuzzySearch("My-Hourly-Summary", "moh"));
+    }
+
+    @Test
+    public void testGetEntityListWithPattern() throws FalconException {
+        String user = System.getProperty("user.name");
+
+        Entity process1 = buildProcess("New-My-Hourly-Summary", user,
+                "consumer=consumer@xyz.com, owner=producer@xyz.com",
+                "testPipeline,dataReplicationPipeline");
+        configStore.publish(EntityType.PROCESS, process1);
+
+        Entity process2 = buildProcess("Random-Summary-Generator", user,
+                "consumer=consumer@xyz.com, owner=producer@xyz.com",
+                "testPipeline,dataReplicationPipeline");
+        configStore.publish(EntityType.PROCESS, process2);
+
+        Entity process3 = buildProcess("My-Hourly-Summary", user, "", "testPipeline");
+        configStore.publish(EntityType.PROCESS, process3);
+
+        Entity process4 = buildProcess("sample-process4", user, "owner=producer@xyz.com", "");
+        configStore.publish(EntityType.PROCESS, process4);
+
+        EntityList entityList = this.getEntityList("process", "tags", "PIPELINES:dataReplicationPipeline",
+                "", "name", "desc", 0, 10, "mhs");
+        Assert.assertNotNull(entityList.getElements());
+        Assert.assertEquals(entityList.getElements().length, 1);
+
+    }
+
     private Entity buildProcess(String name, String username, String tags, String pipelines) {
         ACL acl = new ACL();
         acl.setOwner(username);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 5f4f495..d8d8bfc 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -57,7 +57,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
     @GET
     @Path("list/{type}")
     @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
-    @Monitored(event = "dependencies")
+    @Monitored(event = "list")
     @Override
     public EntityList getEntityList(@Dimension("type") @PathParam("type") String type,
                                     @DefaultValue("") @QueryParam("fields") String fields,
@@ -67,8 +67,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
                                     @DefaultValue("asc") @QueryParam("sortOrder") String sortOrder,
                                     @DefaultValue("0") @QueryParam("offset") Integer offset,
                                     @DefaultValue(DEFAULT_NUM_RESULTS)
-                                    @QueryParam("numResults") Integer resultsPerPage) {
-        return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage);
+                                    @QueryParam("numResults") Integer resultsPerPage,
+                                    @QueryParam("pattern") String pattern) {
+        return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage, pattern);
     }
 
     @GET

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index b50999d..118003f 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -547,6 +547,11 @@ public class FalconCLIIT {
 
         // test entity List cli
         Assert.assertEquals(executeWithURL("entity -list -type cluster" + " -offset 0 -numResults 1"), 0);
+
+        Assert.assertEquals(executeWithURL("entity -list -type process -fields status "
+                + " -filterBy STATUS:SUBMITTED,TYPE:process -orderBy name "
+                + " -sortOrder asc -offset 1 -numResults 1 -pattern abc"), 0);
+
         Assert.assertEquals(executeWithURL("entity -list -type process -fields status "
                 + " -filterBy STATUS:SUBMITTED,TYPE:process -orderBy name "
                 + " -sortOrder asc -offset 1 -numResults 1"), 0);


[2/3] incubator-falcon git commit: FALCON-256 Create new API for Process dependency graph DAG which captures process connected via feeds. Contributed by Ajay Yadav

Posted by sr...@apache.org.
FALCON-256 Create new API for Process dependency graph DAG which captures process connected via feeds. Contributed by Ajay Yadav


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/45a7b989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/45a7b989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/45a7b989

Branch: refs/heads/master
Commit: 45a7b989bfdad6943dc0090c7cea2e098862c9a9
Parents: 06ffdf9
Author: srikanth.sundarrajan <sr...@apache.org>
Authored: Fri Dec 26 10:35:32 2014 +0530
Committer: srikanth.sundarrajan <sr...@apache.org>
Committed: Fri Dec 26 10:35:32 2014 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../apache/falcon/cli/FalconMetadataCLI.java    |  17 +-
 .../org/apache/falcon/client/FalconClient.java  |  17 +-
 .../falcon/resource/LineageGraphResult.java     | 165 +++++++++++++++++++
 docs/src/site/twiki/FalconCLI.twiki             |  17 ++
 docs/src/site/twiki/restapi/EntityLineage.twiki |  39 +++++
 docs/src/site/twiki/restapi/ResourceList.twiki  |   1 +
 pom.xml                                         |   6 +
 prism/pom.xml                                   |   7 +
 .../metadata/LineageMetadataResource.java       | 102 ++++++++++++
 .../metadata/LineageMetadataResourceTest.java   |   8 +
 .../resource/metadata/MetadataTestContext.java  |  18 ++
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  24 +++
 13 files changed, 422 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af4bd9e..5575219 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
+   FALCON-256 Create new API for Process dependency graph DAG which captures 
+   process connected via feeds. (Ajay Yadav via Srikanth Sundarrajan)
+
    FALCON-823 Add path matching ability to the radix tree (Ajay Yadav
    via Srikanth Sundarrajan) 
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
index 63af415..515d328 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -54,6 +54,8 @@ public class FalconMetadataCLI {
     public static final String VERTEX_CMD = "vertex";
     public static final String VERTICES_CMD = "vertices";
     public static final String VERTEX_EDGES_CMD = "edges";
+    public static final String PIPELINE_OPT = "pipeline";
+
 
     public static final String EDGE_CMD = "edge";
     public static final String ID_OPT = "id";
@@ -78,8 +80,12 @@ public class FalconMetadataCLI {
         String key = commandLine.getOptionValue(KEY_OPT);
         String value = commandLine.getOptionValue(VALUE_OPT);
         String direction = commandLine.getOptionValue(DIRECTION_OPT);
+        String pipeline = commandLine.getOptionValue(PIPELINE_OPT);
 
-        if (optionsList.contains(LIST_OPT)) {
+        if (optionsList.contains(LINEAGE_OPT)) {
+            validatePipelineName(pipeline);
+            result = client.getEntityLineageGraph(pipeline).getDotNotation();
+        } else if (optionsList.contains(LIST_OPT)) {
             validateDimensionType(dimensionType.toUpperCase());
             result = client.getDimensionList(dimensionType, cluster);
         } else if (optionsList.contains(RELATIONS_OPT)) {
@@ -105,6 +111,12 @@ public class FalconMetadataCLI {
         OUT.get().println(result);
     }
 
+    private void validatePipelineName(String pipeline) throws FalconCLIException {
+        if (StringUtils.isEmpty(pipeline)) {
+            throw new FalconCLIException("Invalid value for pipeline");
+        }
+    }
+
     private void validateDimensionType(String dimensionType) throws FalconCLIException {
         if (StringUtils.isEmpty(dimensionType)
                 ||  dimensionType.contains("INSTANCE")) {
@@ -157,6 +169,8 @@ public class FalconMetadataCLI {
         Option lineage = new Option(LINEAGE_OPT, false, "Get falcon metadata lineage information");
         group.addOption(discovery);
         group.addOption(lineage);
+        Option pipeline = new Option(PIPELINE_OPT, true,
+                "Get lineage graph for the entities in a pipeline");
         metadataOptions.addOptionGroup(group);
 
         // Add discovery options
@@ -172,6 +186,7 @@ public class FalconMetadataCLI {
         Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
 
         // Add lineage options
+        metadataOptions.addOption(pipeline);
 
         metadataOptions.addOption(url);
         metadataOptions.addOption(type);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 5c476ae..a748c58 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -38,6 +38,7 @@ import org.apache.falcon.resource.EntitySummaryResult;
 import org.apache.falcon.resource.FeedInstanceResult;
 import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.LineageGraphResult;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
 import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
@@ -210,7 +211,8 @@ public class FalconClient {
         LIST("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         RELATIONS("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
         VERTICES("api/metadata/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON),
-        EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON);
+        EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON),
+        LINEAGE("api/metadata/lineage/entities", HttpMethod.GET, MediaType.APPLICATION_JSON);
 
         private String path;
         private String method;
@@ -507,6 +509,19 @@ public class FalconClient {
         return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster);
     }
 
+    public LineageGraphResult getEntityLineageGraph(String pipelineName) throws FalconCLIException {
+        MetadataOperations operation = MetadataOperations.LINEAGE;
+        WebResource resource = service.path(operation.path)
+                .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName);
+
+        ClientResponse clientResponse = resource
+            .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+            .accept(operation.mimeType).type(operation.mimeType)
+            .method(operation.method, ClientResponse.class);
+        checkIfSuccessful(clientResponse);
+        return clientResponse.getEntity(LineageGraphResult.class);
+    }
+
     public String getDimensionRelations(String dimensionType, String dimensionName) throws FalconCLIException {
         return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
new file mode 100644
index 0000000..acf5d11
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.commons.lang.StringUtils;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * LineageGraphResult is the output returned by all the apis returning a DAG.
+ */
+@XmlRootElement(name = "result")
+@XmlAccessorType (XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class LineageGraphResult {
+
+    private String[] vertices;
+
+    @XmlElement(name="edges")
+    private Edge[] edges;
+
+    private static final JAXBContext JAXB_CONTEXT;
+
+    static {
+        try {
+            JAXB_CONTEXT = JAXBContext.newInstance(LineageGraphResult.class);
+        } catch (JAXBException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public LineageGraphResult() {
+        // default constructor for JAXB
+    }
+
+    /**
+     * A class to represent an edge in a DAG.
+     */
+    @XmlRootElement(name = "edge")
+    @XmlAccessorType(XmlAccessType.FIELD)
+    public static class Edge {
+        @XmlElement
+        private String from;
+        @XmlElement
+        private String to;
+        @XmlElement
+        private String label;
+
+        public Edge() {
+
+        }
+
+        public Edge(String from, String to, String label) {
+            this.from = from;
+            this.to = to;
+            this.label = label;
+        }
+
+        public String getFrom() {
+            return from;
+        }
+
+        public void setFrom(String from) {
+            this.from = from;
+        }
+
+        public String getTo() {
+            return to;
+        }
+
+        public void setTo(String to) {
+            this.to = to;
+        }
+
+        public String getLabel() {
+            return label;
+        }
+
+        public void setLabel(String label) {
+            this.label = label;
+        }
+
+        public String getDotNotation() {
+            StringBuilder result = new StringBuilder();
+            if (StringUtils.isNotBlank(this.from) && StringUtils.isNotBlank(this.to)
+                    && StringUtils.isNotBlank(this.label)) {
+                result.append("\"" + this.from +"\"");
+                result.append(" -> ");
+                result.append("\"" + this.to + "\"");
+                result.append(" [ label = \"" + this.label + "\" ] \n");
+            }
+            return result.toString();
+        }
+
+        @Override
+        public String toString() {
+            return getDotNotation();
+        }
+
+    }
+
+
+    public String getDotNotation() {
+        StringBuilder result = new StringBuilder();
+        result.append("digraph g{ \n");
+        if (this.vertices != null) {
+            for (String v : this.vertices) {
+                result.append("\"" + v + "\"");
+                result.append("\n");
+            }
+        }
+
+        if (this.edges != null) {
+            for (Edge e : this.edges) {
+                result.append(e.getDotNotation());
+            }
+        }
+        result.append("}\n");
+        return result.toString();
+    }
+
+    public String[] getVertices() {
+        return vertices;
+    }
+
+    public void setVertices(String[] vertices) {
+        this.vertices = vertices;
+    }
+
+    public Edge[] getEdges() {
+        return edges;
+    }
+
+    public void setEdges(Edge[] edges) {
+        this.edges = edges;
+    }
+
+
+    @Override
+    public String toString() {
+        return getDotNotation();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index d8199dd..d37cf8c 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -56,6 +56,9 @@ Optional Args : -fields <<field1,field2>> -filterBy <<field1:value1,field2:value
 
 <a href="./Restapi/EntityList.html">Optional params described here.</a>
 
+
+
+
 ---+++Summary
 
 Summary of entities of a particular type and a cluster will be listed. Entity summary has N most recent instances of entity.
@@ -255,6 +258,20 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -
 
 ---++ Metadata Lineage Options
 
+---+++Lineage
+
+Returns the relationship between processes and feeds in a given pipeline in <a href="http://www.graphviz.org/content/dot-language">dot</a> format.
+You can use the output and view a graphical representation of DAG using an online graphviz viewer like <a href="http://graphviz-dev.appspot.com/">this</a>.
+
+
+Usage:
+
+$FALCON_HOME/bin/falcon metadata -lineage -pipeline my-pipeline
+
+pipeline is a mandatory option.
+
+
+
 ---+++ Vertex
 
 Get the vertex with the specified id.

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/EntityLineage.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityLineage.twiki b/docs/src/site/twiki/restapi/EntityLineage.twiki
new file mode 100644
index 0000000..ea747b1
--- /dev/null
+++ b/docs/src/site/twiki/restapi/EntityLineage.twiki
@@ -0,0 +1,39 @@
+---++  GET api/metadata/lineage/entities?pipeline=:pipeline
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+It returns the graph depicting the relationship between the various processes and feeds in a given pipeline.
+
+---++ Parameters
+   * :pipeline is the name of the pipeline
+
+---++ Results
+It returns a json graph
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/metadata/lineage/entities?pipeline=my-pipeline
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "vertices": ["my-minutely-process", "my-hourly-process"],
+    "edges":
+    [
+        {
+         "from"  : "my-minutely-process",
+         "to"    : "my-hourly-process",
+         "label" : "my-minutely-feed"
+        },
+        {
+         "from"  : "my-hourly-process",
+         "to"    : "my-minutely-process",
+         "label" : "my-hourly-feedback"
+        }
+    ]
+}
+</verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index a87818b..2368631 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -77,6 +77,7 @@ See also: [[../Security.twiki][Security in Falcon]]
 | GET         | [[AdjacentVertices][api/metadata/lineage/vertices/:id/:direction]]                     | get the adjacent vertices or edges of the vertex with the specified direction |
 | GET         | [[AllEdges][api/metadata/lineage/edges/all]]                                           | get all edges                                                                 |
 | GET         | [[Edge][api/metadata/lineage/edges/:id]]                                               | get the edge with the specified id                                            |
+| GET         | [[EntityLineage][api/metadata/lineage/entities?pipeline=:name]]                        | Get lineage graph for processes and feeds in the specified pipeline           |
 
 ---++ REST Call on Metadata Discovery Resource
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5a6c095..1b3a6c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -625,6 +625,12 @@
             </dependency>
 
             <dependency>
+                <groupId>com.tinkerpop.gremlin</groupId>
+                <artifactId>gremlin-java</artifactId>
+                <version>2.6.0</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.springframework</groupId>
                 <artifactId>spring-beans</artifactId>
                 <version>3.0.3.RELEASE</version>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 26e577d..43cc4b4 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -61,6 +61,13 @@
         </dependency>
 
         <dependency>
+            <groupId>com.tinkerpop.gremlin</groupId>
+            <artifactId>gremlin-java</artifactId>
+        </dependency>
+
+
+
+        <dependency>
             <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-test-util</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
index 2404be4..0c6b2b6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.resource.metadata;
 
+import com.google.common.collect.Sets;
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Element;
@@ -25,12 +26,22 @@ import com.tinkerpop.blueprints.Vertex;
 import com.tinkerpop.blueprints.VertexQuery;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
+import com.tinkerpop.gremlin.java.GremlinPipeline;
 import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.metadata.GraphUtils;
 import org.apache.falcon.metadata.RelationshipLabel;
 import org.apache.falcon.metadata.RelationshipProperty;
 import org.apache.falcon.metadata.RelationshipType;
+import org.apache.falcon.monitors.Dimension;
+import org.apache.falcon.monitors.Monitored;
+import org.apache.falcon.resource.LineageGraphResult;
 import org.apache.falcon.util.StartupProperties;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
@@ -48,7 +59,10 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Jersey Resource for lineage metadata operations.
@@ -81,6 +95,37 @@ public class LineageMetadataResource extends AbstractMetadataResource {
         }
     }
 
+
+    @GET
+    @Path("/entities")
+    @Produces({MediaType.APPLICATION_JSON})
+    @Monitored(event = "entity-lineage")
+    public Response getEntityLineageGraph(@Dimension("pipeline") @QueryParam("pipeline") final String pipeline) {
+        LOG.info("Get lineage Graph for pipeline:({})", pipeline);
+
+        try {
+            Iterable<Vertex> processes;
+            if (StringUtils.isNotBlank(pipeline)) {
+                Iterable<Vertex> pipelineNode = getGraph().getVertices(RelationshipProperty.NAME.getName(),
+                        pipeline);
+                if (!pipelineNode.iterator().hasNext()) {
+                    throw FalconWebException.newException("No pipelines found for " + pipeline,
+                            Response.Status.BAD_REQUEST);
+                }
+                Vertex v = pipelineNode.iterator().next(); // pipeline names are unique
+                processes = new GremlinPipeline(v).in(RelationshipLabel.PIPELINES.getName())
+                        .has(RelationshipProperty.TYPE.getName(), RelationshipType.PROCESS_ENTITY.getName());
+                return Response.ok(buildJSONGraph(processes)).build();
+            }
+            throw FalconWebException.newException("Pipeline name can not be blank",
+                    Response.Status.INTERNAL_SERVER_ERROR);
+
+        } catch (Exception e) {
+            LOG.error("Error while fetching entity lineage: ", e);
+            throw FalconWebException.newException(e, Response.Status.INTERNAL_SERVER_ERROR);
+        }
+    }
+
     /**
      * Get all vertices.
      *
@@ -392,6 +437,63 @@ public class LineageMetadataResource extends AbstractMetadataResource {
         return response;
     }
 
+    private LineageGraphResult buildJSONGraph(Iterable<Vertex> processes) throws  FalconException {
+        LineageGraphResult result = new LineageGraphResult();
+
+        List<String> vertexArray = new LinkedList<String>();
+        List<LineageGraphResult.Edge> edgeArray = new LinkedList<LineageGraphResult.Edge>();
+
+        Map<String, String> feedProducerMap = new HashMap<String, String>();
+        Map<String, List<String>> feedConsumerMap = new HashMap<String, List<String>>();
+
+        if (processes != null) {
+            for (Vertex process : processes) {
+                String processName = process.getProperty(RelationshipProperty.NAME.getName());
+                vertexArray.add(processName);
+                Process producer = ConfigurationStore.get().get(EntityType.PROCESS, processName);
+
+                if (producer != null) {
+                    if (producer.getOutputs() != null) {
+                        //put all produced feeds in feedProducerMap
+                        for (Output output : producer.getOutputs().getOutputs()) {
+                            feedProducerMap.put(output.getFeed(), processName);
+                        }
+                    }
+                    if (producer.getInputs() != null) {
+                        //put all consumed feeds in feedConsumerMap
+                        for (Input input : producer.getInputs().getInputs()) {
+                            //if feed already exists then append it, else insert it with a list
+                            if (feedConsumerMap.containsKey(input.getFeed())) {
+                                feedConsumerMap.get(input.getFeed()).add(processName);
+                            } else {
+                                List<String> value = new LinkedList<String>();
+                                value.add(processName);
+                                feedConsumerMap.put(input.getFeed(), value);
+                            }
+                        }
+                    }
+                }
+            }
+            LOG.debug("feedProducerMap = {}", feedProducerMap);
+            LOG.debug("feedConsumerMap = {}", feedConsumerMap);
+
+            // discard feeds which aren't edges between two processes
+            Set<String> pipelineFeeds = Sets.intersection(feedProducerMap.keySet(), feedConsumerMap.keySet());
+            for (String feedName : pipelineFeeds) {
+                String producerProcess = feedProducerMap.get(feedName);
+                // make an edge from producer to all the consumers
+                for (String consumerProcess : feedConsumerMap.get(feedName)) {
+                    edgeArray.add(new LineageGraphResult.Edge(producerProcess, consumerProcess, feedName));
+                }
+            }
+        }
+
+        result.setEdges(edgeArray.toArray(new LineageGraphResult.Edge[edgeArray.size()]));
+        result.setVertices(vertexArray.toArray(new String[vertexArray.size()]));
+        LOG.debug("result = {}", result);
+        return result;
+    }
+
     private static void validateInputs(String errorMsg, String... inputs) {
         for (String input : inputs) {
             if (StringUtils.isEmpty(input)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index cabb44c..ac0e51f 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -351,6 +351,14 @@ public class LineageMetadataResourceTest {
         }
     }
 
+    @Test
+    public void testEntityLineage() throws Exception {
+        testContext.addConsumerProcess();
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Response response = resource.getEntityLineageGraph("testPipeline");
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+    }
+
     private void assertBasicVertexProperties(Vertex vertex, Map vertexProperties) {
         RelationshipProperty[] properties = {
             RelationshipProperty.NAME,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
index aaddf62..6f798a8 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
@@ -51,6 +51,7 @@ public class MetadataTestContext {
     public static final String OPERATION = "GENERATE";
 
     public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String CHILD_PROCESS_ENTITY_NAME = "sample-child-process";
     public static final String PROCESS_ENTITY_NAME = "sample-process";
     public static final String COLO_NAME = "west-coast";
     public static final String WORKFLOW_NAME = "imp-click-join-workflow";
@@ -171,6 +172,23 @@ public class MetadataTestContext {
         configStore.publish(EntityType.PROCESS, processEntity);
     }
 
+    public void addConsumerProcess() throws Exception {
+        org.apache.falcon.entity.v0.process.Process processEntity =
+                EntityBuilderTestUtil.buildProcess(CHILD_PROCESS_ENTITY_NAME,
+                        clusterEntity, "classified-as=Critical", "testPipeline");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+
+        for (Feed inputFeed : inputFeeds) {
+            EntityBuilderTestUtil.addOutput(processEntity, inputFeed);
+        }
+
+        for (Feed outputFeed : outputFeeds) {
+            EntityBuilderTestUtil.addInput(processEntity, outputFeed);
+        }
+
+        configStore.publish(EntityType.PROCESS, processEntity);
+    }
+
     public void addInstance() throws Exception {
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
                 WorkflowExecutionContext.Type.POST_PROCESSING);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 9c6ad80..b50999d 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -487,6 +487,30 @@ public class FalconCLIIT {
                 + " -file " + createTempJobPropertiesFile()), 0);
     }
 
+
+    @Test
+    public void testEntityLineage() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        String filePath;
+        filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+        context.setCluster(overlay.get("cluster"));
+        Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+        filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+        Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0);
+
+        Assert.assertEquals(executeWithURL("metadata -lineage -pipeline testPipeline"), 0);
+
+    }
+
     @Test
     public void testEntityPaginationFilterByCommands() throws Exception {