You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2014/12/26 06:32:11 UTC
[1/3] incubator-falcon git commit: FALCON-823 Add path matching
ability to the radix tree. Contributed by Ajay Yadav
Repository: incubator-falcon
Updated Branches:
refs/heads/master 9827ac9e1 -> 463f85489
FALCON-823 Add path matching ability to the radix tree. Contributed by Ajay Yadav
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/06ffdf91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/06ffdf91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/06ffdf91
Branch: refs/heads/master
Commit: 06ffdf913fed07dcb8eee81f4aa0f0779b051ed0
Parents: 9827ac9
Author: srikanth.sundarrajan <sr...@apache.org>
Authored: Fri Dec 26 10:34:16 2014 +0530
Committer: srikanth.sundarrajan <sr...@apache.org>
Committed: Fri Dec 26 10:34:16 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../falcon/entity/common/FeedDataPath.java | 15 +-
.../falcon/entity/store/FeedPathStore.java | 5 +
.../apache/falcon/util/FalconRadixUtils.java | 314 +++++++++++++++++++
.../java/org/apache/falcon/util/RadixNode.java | 23 ++
.../java/org/apache/falcon/util/RadixTree.java | 38 ++-
.../apache/falcon/entity/FeedDataPathTest.java | 124 ++++++++
.../org/apache/falcon/util/RadixNodeTest.java | 18 +-
.../org/apache/falcon/util/RadixTreeTest.java | 22 ++
9 files changed, 544 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5d529b9..af4bd9e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
+ FALCON-823 Add path matching ability to the radix tree (Ajay Yadav
+ via Srikanth Sundarrajan)
+
FALCON-329 Falcon client methods should return objects. (Samar via Shwetha GS)
FALCON-593 Preserve data type for properties in a vertex. (Ajay
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
index 39e636b..6ededbb 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
@@ -30,14 +30,25 @@ public final class FeedDataPath {
* Standard variables for feed time components.
*/
public static enum VARS {
- YEAR("yyyy"), MONTH("MM"), DAY("dd"), HOUR("HH"), MINUTE("mm");
+ YEAR("yyyy", "([0-9]{4})"), MONTH("MM", "(0[1-9]|1[0-2])"), DAY("dd", "(0[1-9]|1[0-9]|2[0-9]|3[0-1])"),
+ HOUR("HH", "([0-1][0-9]|2[0-4])"), MINUTE("mm", "([0-5][0-9]|60)");
private final Pattern pattern;
private final String datePattern;
+ private final String patternRegularExpression;
- private VARS(String datePattern) {
+ private VARS(String datePattern, String patternRegularExpression) {
pattern = Pattern.compile("\\$\\{" + name() + "\\}");
this.datePattern = datePattern;
+ this.patternRegularExpression = patternRegularExpression;
+ }
+
+ public String getPatternRegularExpression() {
+ return patternRegularExpression;
+ }
+
+ public String getDatePattern() {
+ return datePattern;
}
public String regex() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
index 97d21c1..1be12fe 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/FeedPathStore.java
@@ -18,6 +18,8 @@
package org.apache.falcon.entity.store;
+import org.apache.falcon.util.FalconRadixUtils;
+
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
@@ -34,6 +36,9 @@ public interface FeedPathStore<T> {
int getSize();
@Nullable
+ Collection<T> find(@Nonnull String key, @Nonnull FalconRadixUtils.INodeAlgorithm algorithm);
+
+ @Nullable
Collection<T> find(@Nonnull String key);
boolean delete(@Nonnull String key, @Nonnull T value);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
new file mode 100644
index 0000000..bbd73c7
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/FalconRadixUtils.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.entity.common.FeedDataPath;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Falcon specific utilities for the Radix Tree.
+ */
+public class FalconRadixUtils {
+
+ /**
+ * This interface implements the various algorithms to compare node's key with input based on whether you want
+ * a regular expression based algorithm or a character by character matching algorithm.
+ */
+ public interface INodeAlgorithm {
+
+ /**
+ * Checks if the given key and input match.
+ * @param key key of the node
+ * @param input input String to be matched against key.
+ * @return true if key and input match.
+ */
+ boolean match(String key, String input);
+
+ boolean startsWith(String key, String input);
+
+ /**
+ * Finds next node to take for traversal among currentNode's children.
+ * @param currentNode of RadixTree which has been matched.
+ * @param input input String to be searched.
+ * @return Node to be traversed next.
+ */
+ RadixNode getNextCandidate(RadixNode currentNode, String input);
+
+ // for the given node and input key, finds the remainingText to be matched with child sub tree.
+ String getRemainingText(RadixNode currentNode, String key);
+ }
+
+ /**
+ * This Algorithm does a plain string comparison for all
+ * type of operations on a node.
+ */
+ static class StringAlgorithm implements INodeAlgorithm {
+
+ @Override
+ public boolean match(String key, String input) {
+ return StringUtils.equals(key, input);
+ }
+
+ @Override
+ public boolean startsWith(String nodeKey, String inputKey) {
+ return inputKey.startsWith(nodeKey);
+ }
+
+ @Override
+ public RadixNode getNextCandidate(RadixNode currentNode, String input) {
+ RadixNode newRoot = null;
+ String remainingText = input.substring(currentNode.getKey().length());
+ List<RadixNode> result = currentNode.getChildren();
+ for(RadixNode child : result){
+ if (child.getKey().charAt(0) == remainingText.charAt(0)){
+ newRoot = child;
+ break;
+ }
+ }
+ return newRoot;
+ }
+
+ @Override
+ public String getRemainingText(RadixNode currentNode, String key) {
+ return key.substring(currentNode.getKey().length());
+ }
+
+
+ }
+
+ static class FeedRegexAlgorithm implements INodeAlgorithm {
+
+ /**
+ * This function matches a feed path template with feed instance's path string.
+ *
+ * Key is assumed to be a feed's path template and inputString is assumed to be an instance's path string.
+ * Variable/Regex parts of the feed's template are matched against the corresponding parts in inputString
+ * using regular expression and for other parts a character by character match is performed.
+ * e.g. Given templateString (/data/cas/${YEAR}/${MONTH}/${DAY}) and inputString (/data/cas/2014/09/09)
+ * the function will return true.
+ * @param templateString Node's key (Feed's template path)
+ * @param inputString inputString String to be matched against templateString(instance's path)
+ * @return true if the templateString and inputString match, false otherwise.
+ */
+ @Override
+ public boolean match(String templateString, String inputString) {
+ if (StringUtils.isBlank(templateString)) {
+ return false;
+ }
+ // Divide the templateString and inputString into templateParts of regex and character matches
+ List<String> templateParts = getPartsInPathTemplate(templateString);
+ List<String> inputStringParts = getCorrespondingParts(inputString, templateParts);
+
+ if (inputStringParts.size() != templateParts.size()) {
+ return false;
+ }
+
+ int counter = 0;
+ while (counter < inputStringParts.size()) {
+ if (!matchPart(templateParts.get(counter), inputStringParts.get(counter))) {
+ return false;
+ }
+ counter++;
+ }
+ return true;
+ }
+
+
+ /**
+ *
+ * Finds if the current node's key is a prefix of the given inputString or not.
+ *
+ * @param inputTemplate inputTemplate String
+ * @param inputString inputString to be checked
+ * @return true if inputString starts with inputTemplate, false otherwise.
+ */
+ @Override
+ public boolean startsWith(String inputTemplate, String inputString) {
+
+ if (StringUtils.isBlank(inputString)) {
+ return false;
+ }
+ if (StringUtils.isBlank(inputTemplate)) {
+ return true;
+ }
+
+ // divide inputTemplate and inputString into corresponding templateParts of regex and character only strings
+ List<String> templateParts = getPartsInPathTemplate(inputTemplate);
+ List<String> remainingPattern = getCorrespondingParts(inputString, templateParts);
+
+ if (templateParts.size() > remainingPattern.size()) {
+ return false;
+ }
+
+ int counter = 0;
+ // compare part by part till the templateParts end
+ for (String templatePart : templateParts) {
+ String part = remainingPattern.get(counter);
+ if (!matchPart(templatePart, part)) {
+ return false;
+ }
+ counter++;
+ }
+ return true;
+ }
+
+ @Override
+ public RadixNode getNextCandidate(RadixNode currentNode, String input) {
+ RadixNode newRoot = null;
+ // replace the regex with pattern's length
+ String remainingText = input.substring(getPatternsEffectiveLength(currentNode.getKey()));
+ List<RadixNode> result = currentNode.getChildren();
+ for(RadixNode child : result) {
+ String key = child.getKey();
+ if (key.startsWith("${")) {
+ // get the regex
+ String regex = key.substring(0, key.indexOf("}") + 1);
+ // match the text and the regex
+ FeedDataPath.VARS var = getMatchingRegex(regex);
+ if (matchPart(regex, input.substring(0, var.getDatePattern().length()))) {
+ newRoot = child; // if it matches then this is the newRoot
+ break;
+ }
+ } else if (child.getKey().charAt(0) == remainingText.charAt(0)) {
+ newRoot = child;
+ break;
+ }
+ }
+ return newRoot;
+ }
+
+ @Override
+ public String getRemainingText(RadixNode currentNode, String inputString) {
+ // find the match length for current inputString
+ return inputString.substring(getPatternsEffectiveLength(currentNode.getKey()));
+ }
+
+ private int getPatternsEffectiveLength(String templateString) {
+ if (StringUtils.isBlank(templateString)) {
+ return 0;
+ }
+ for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
+ templateString = templateString.replace("${" + var.name() + "}", var.getDatePattern());
+ }
+ return templateString.length();
+ }
+
+ /**
+ * Divide a given template string into parts of regex and character strings
+ * e.g. /data/cas/${YEAR}/${MONTH}/${DAY} will be converted to
+ * [/data/cas/, ${YEAR}, /, ${MONTH}, /, ${DAY}]
+ * @param templateString input string representing a feed's path template
+ * @return list of parts in input templateString which are either completely regex or normal string.
+ */
+ private List<String> getPartsInPathTemplate(String templateString) {
+ //divide the node's templateString in parts of regular expression and normal string
+ List<String> parts = new ArrayList<String>();
+ Matcher matcher = FeedDataPath.PATTERN.matcher(templateString);
+ int currentIndex = 0;
+ while (matcher.find()) {
+ parts.add(templateString.substring(currentIndex, matcher.start()));
+ parts.add(matcher.group());
+ currentIndex = matcher.end();
+ }
+ if (currentIndex != templateString.length()) {
+ parts.add(templateString.substring(currentIndex));
+ }
+ return Collections.unmodifiableList(parts);
+ }
+
+
+ private FeedDataPath.VARS getMatchingRegex(String inputPart) {
+ //inputPart will be something like ${YEAR}
+ inputPart = inputPart.replace("${", "\\$\\{");
+ inputPart = inputPart.replace("}", "\\}");
+
+ for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {
+ if (StringUtils.equals(inputPart, var.regex())) {
+ return var;
+ }
+ }
+ return null;
+ }
+
+
+ /**
+ * Divides a string into corresponding parts for the template to carry out comparison.
+ * templateParts = [/data/cas/, ${YEAR}, /, ${MONTH}, /, ${DAY}]
+ * inputString = /data/cas/2014/09/09
+ * returns [/data/cas/, 2014, /, 09, /, 09]
+ * @param inputString normal string representing feed instance path
+ * @param templateParts parts of feed's path template broken into regex and non-regex units.
+ * @return a list of strings where each part of the list corresponds to a part in list of template parts.
+ */
+ private List<String> getCorrespondingParts(String inputString, List<String> templateParts) {
+ List<String> stringParts = new ArrayList<String>();
+ int counter = 0;
+ while (StringUtils.isNotBlank(inputString) && counter < templateParts.size()) {
+ String currentTemplatePart = templateParts.get(counter);
+ int length = Math.min(getPatternsEffectiveLength(currentTemplatePart), inputString.length());
+ stringParts.add(inputString.substring(0, length));
+ inputString = inputString.substring(length);
+ counter++;
+ }
+ if (StringUtils.isNotBlank(inputString)) {
+ stringParts.add(inputString);
+ }
+ return stringParts;
+ }
+
+ /**
+ * Compare a pure regex or pure string part with a given string.
+ *
+ * @param template template part, which can either be a pure regex or pure non-regex string.
+ * @param input input String to be matched against the template part.
+ * @return true if the input string matches the template, in case of a regex component a regex comparison is
+ * made, else a character by character comparison is made.
+ */
+ private boolean matchPart(String template, String input) {
+ if (template.startsWith("${")) { // if the part begins with ${ then it's a regex part, do regex match
+ template = template.replace("${", "\\$\\{");
+ template = template.replace("}", "\\}");
+ for (FeedDataPath.VARS var : FeedDataPath.VARS.values()) {//find which regex is this
+ if (StringUtils.equals(var.regex(), template)) {// regex found, do matching
+ //find part of the input string which should be matched against regex
+ String desiredPart = input.substring(0, var.getDatePattern().length());
+ Pattern pattern = Pattern.compile(var.getPatternRegularExpression());
+ Matcher matcher = pattern.matcher(desiredPart);
+ if (!matcher.matches()) {
+ return false;
+ }
+ return true;
+ }
+ }
+ return false;
+ } else {// do exact match with normal strings
+ if (!input.startsWith(template)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/util/RadixNode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/RadixNode.java b/common/src/main/java/org/apache/falcon/util/RadixNode.java
index 564df8e..12227cb 100644
--- a/common/src/main/java/org/apache/falcon/util/RadixNode.java
+++ b/common/src/main/java/org/apache/falcon/util/RadixNode.java
@@ -124,4 +124,27 @@ public class RadixNode<T> {
return matchLength;
}
+
+
+ /**
+ * Finds the length of the match between node's key and input.
+ *
+ * It can do either a character by character match or a regular expression match(used to match a feed instance path
+ * with feed location template). Only regular expressions allowed in the feed path are evaluated for matching.
+ * @param input input string to be matched with the key of the node.
+ * @param matcher A custom matcher algorithm to match node's key against the input. It is used when matching
+ * path of a Feed's instance to Feed's path template.
+ * @return
+ */
+ public boolean matches(String input, FalconRadixUtils.INodeAlgorithm matcher) {
+ if (input == null) {
+ return false;
+ }
+
+ if (matcher == null) {
+ return StringUtils.equals(getKey(), input);
+ }
+
+ return matcher.match(this.getKey(), input);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/main/java/org/apache/falcon/util/RadixTree.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/RadixTree.java b/common/src/main/java/org/apache/falcon/util/RadixTree.java
index 6dbe160..6cd79f5 100644
--- a/common/src/main/java/org/apache/falcon/util/RadixTree.java
+++ b/common/src/main/java/org/apache/falcon/util/RadixTree.java
@@ -189,21 +189,35 @@ public class RadixTree<T> implements FeedPathStore<T>, Formattable {
*/
@Override
@Nullable
- public synchronized Collection<T> find(@Nonnull String key) {
- if (key != null && !key.trim().isEmpty()){
- return recursiveFind(key.trim(), root);
+ public synchronized Collection<T> find(@Nonnull String key, FalconRadixUtils.INodeAlgorithm algorithm) {
+ if (key != null && !key.trim().isEmpty()) {
+ if (algorithm == null) {
+ algorithm = new FalconRadixUtils.StringAlgorithm();
+ }
+ return recursiveFind(key.trim(), root, algorithm);
+ }
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Collection<T> find(@Nonnull String key) {
+ if (key != null && !key.trim().isEmpty()) {
+ FalconRadixUtils.INodeAlgorithm algorithm = new FalconRadixUtils.StringAlgorithm();
+ return recursiveFind(key.trim(), root, algorithm);
}
return null;
}
- private Collection<T> recursiveFind(String key, RadixNode<T> currentNode){
+ private Collection<T> recursiveFind(String key, RadixNode<T> currentNode,
+ FalconRadixUtils.INodeAlgorithm algorithm){
- if (!key.startsWith(currentNode.getKey())){
+ if (!algorithm.startsWith(currentNode.getKey(), key)){
LOG.debug("Current Node key: {} is not a prefix in the input key: {}", currentNode.getKey(), key);
return null;
}
- if (StringUtils.equals(key, currentNode.getKey())){
+ if (algorithm.match(currentNode.getKey(), key)){
if (currentNode.isTerminal()){
LOG.debug("Found the terminal node with key: {} for the given input.", currentNode.getKey());
return currentNode.getValues();
@@ -214,21 +228,15 @@ public class RadixTree<T> implements FeedPathStore<T>, Formattable {
}
//find child to follow, using remaining Text
- RadixNode<T> newRoot = null;
- String remainingText = key.substring(currentNode.getKey().length());
- for(RadixNode<T> child : currentNode.getChildren()){
- if (child.getKey().charAt(0) == remainingText.charAt(0)){
- newRoot = child;
- break;
- }
- }
+ RadixNode<T> newRoot = algorithm.getNextCandidate(currentNode, key);
+ String remainingText = algorithm.getRemainingText(currentNode, key);
if (newRoot == null){
LOG.debug("No child found to follow for further processing. Current node key {}");
return null;
}else {
LOG.debug("Recursing with new key: {} and new remainingText: {}", newRoot.getKey(), remainingText);
- return recursiveFind(remainingText, newRoot);
+ return recursiveFind(remainingText, newRoot, algorithm);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
new file mode 100644
index 0000000..c405556
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/entity/FeedDataPathTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.entity;
+
+import org.apache.falcon.entity.common.FeedDataPath;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ *
+ */
+public class FeedDataPathTest {
+
+ @Test
+ public void testMinutesRegularExpression() {
+ String monthPattern = FeedDataPath.VARS.MINUTE.getPatternRegularExpression();
+ Assert.assertFalse("0".matches(monthPattern));
+ Assert.assertFalse("1".matches(monthPattern));
+ Assert.assertFalse("61".matches(monthPattern));
+ Assert.assertFalse("010".matches(monthPattern));
+ Assert.assertFalse("10 ".matches(monthPattern));
+ Assert.assertFalse(" 10".matches(monthPattern));
+
+
+ Assert.assertTrue("00".matches(monthPattern));
+ Assert.assertTrue("01".matches(monthPattern));
+ Assert.assertTrue("60".matches(monthPattern));
+ }
+
+ @Test
+ public void testHourRegularExpression() {
+ String hourPattern = FeedDataPath.VARS.HOUR.getPatternRegularExpression();
+ Assert.assertFalse("0".matches(hourPattern));
+ Assert.assertFalse("1".matches(hourPattern));
+ Assert.assertFalse("2".matches(hourPattern));
+ Assert.assertFalse("25".matches(hourPattern));
+ Assert.assertFalse("29".matches(hourPattern));
+ Assert.assertFalse("010".matches(hourPattern));
+ Assert.assertFalse("10 ".matches(hourPattern));
+ Assert.assertFalse(" 10".matches(hourPattern));
+
+
+ Assert.assertTrue("00".matches(hourPattern));
+ Assert.assertTrue("01".matches(hourPattern));
+ Assert.assertTrue("24".matches(hourPattern));
+ Assert.assertTrue("10".matches(hourPattern));
+ Assert.assertTrue("19".matches(hourPattern));
+ Assert.assertTrue("12".matches(hourPattern));
+ }
+
+
+ @Test
+ public void testDayRegularExpression() {
+ String dayPattern = FeedDataPath.VARS.DAY.getPatternRegularExpression();
+ Assert.assertFalse("0".matches(dayPattern));
+ Assert.assertFalse("1".matches(dayPattern));
+ Assert.assertFalse("32".matches(dayPattern));
+ Assert.assertFalse("00".matches(dayPattern));
+ Assert.assertFalse("010".matches(dayPattern));
+ Assert.assertFalse("10 ".matches(dayPattern));
+ Assert.assertFalse(" 10".matches(dayPattern));
+
+
+ Assert.assertTrue("01".matches(dayPattern));
+ Assert.assertTrue("10".matches(dayPattern));
+ Assert.assertTrue("29".matches(dayPattern));
+ Assert.assertTrue("30".matches(dayPattern));
+ Assert.assertTrue("31".matches(dayPattern));
+ }
+
+ @Test
+ public void testMonthRegularExpression() {
+ String monthPattern = FeedDataPath.VARS.MONTH.getPatternRegularExpression();
+ Assert.assertFalse("0".matches(monthPattern));
+ Assert.assertFalse("1".matches(monthPattern));
+ Assert.assertFalse("13".matches(monthPattern));
+ Assert.assertFalse("19".matches(monthPattern));
+ Assert.assertFalse("00".matches(monthPattern));
+ Assert.assertFalse("010".matches(monthPattern));
+ Assert.assertFalse("10 ".matches(monthPattern));
+ Assert.assertFalse(" 10".matches(monthPattern));
+
+
+ Assert.assertTrue("01".matches(monthPattern));
+ Assert.assertTrue("02".matches(monthPattern));
+ Assert.assertTrue("10".matches(monthPattern));
+ Assert.assertTrue("12".matches(monthPattern));
+ }
+
+ @Test
+ public void testYearRegularExpression() {
+ String monthPattern = FeedDataPath.VARS.YEAR.getPatternRegularExpression();
+ Assert.assertFalse("0".matches(monthPattern));
+ Assert.assertFalse("1".matches(monthPattern));
+ Assert.assertFalse("13".matches(monthPattern));
+ Assert.assertFalse("19".matches(monthPattern));
+ Assert.assertFalse("00".matches(monthPattern));
+ Assert.assertFalse("010".matches(monthPattern));
+ Assert.assertFalse("10 ".matches(monthPattern));
+ Assert.assertFalse(" 10".matches(monthPattern));
+
+
+ Assert.assertTrue("0001".matches(monthPattern));
+ Assert.assertTrue("2014".matches(monthPattern));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java b/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
index 4f63806..aea28e6 100644
--- a/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
+++ b/common/src/test/java/org/apache/falcon/util/RadixNodeTest.java
@@ -19,7 +19,8 @@
package org.apache.falcon.util;
import org.testng.Assert;
-import org.testng.annotations.*;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.HashSet;
@@ -90,4 +91,19 @@ public class RadixNodeTest {
Assert.assertTrue(normalNode.containsValue("CAS Project"));
}
+ @Test
+ public void testMatchInput() {
+ RadixNode<String> node = new RadixNode<String>();
+
+ FalconRadixUtils.INodeAlgorithm matcher = new FalconRadixUtils.FeedRegexAlgorithm();
+ node.setKey("/data/cas/projects/${YEAR}/${MONTH}/${DAY}");
+ Assert.assertTrue(node.matches("/data/cas/projects/2014/09/09", matcher));
+ Assert.assertFalse(node.matches("/data/cas/projects/20140909", matcher));
+ Assert.assertFalse(node.matches("/data/2014/projects/2014/09/09", matcher));
+ Assert.assertFalse(node.matches("/data/2014/projects/2014/09/", matcher));
+ Assert.assertFalse(node.matches("/data/cas/projects/2014/09/09trail", matcher));
+ Assert.assertFalse(node.matches("/data/cas/projects/2014/09/09/", matcher));
+ Assert.assertFalse(node.matches("/data/cas/projects/2014/09/", matcher));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/06ffdf91/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
index 28589ed..109c24d 100644
--- a/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
+++ b/common/src/test/java/org/apache/falcon/util/RadixTreeTest.java
@@ -34,6 +34,7 @@ import java.util.List;
public class RadixTreeTest {
private RadixTree<String> tree;
+ private FalconRadixUtils.INodeAlgorithm regexAlgorithm = new FalconRadixUtils.FeedRegexAlgorithm();
@BeforeMethod
public void setUp() {
@@ -118,6 +119,27 @@ public class RadixTreeTest {
}
+ //Tests for find using regular expression
+ @Test
+ public void testFindUsingRegex() {
+ tree.insert("/data/cas/${YEAR}/", "rtbd");
+ Assert.assertTrue(tree.find("/data/cas/2014/", regexAlgorithm).contains("rtbd"));
+ Assert.assertNull(tree.find("/data/cas/", regexAlgorithm));
+ Assert.assertNull(tree.find("/data/cas/2014/09", regexAlgorithm));
+ Assert.assertNull(tree.find("/data/cas/${YEAR}/", regexAlgorithm));
+
+ tree.insert("/data/cas/${YEAR}/colo", "local");
+ tree.insert("/data/cas/${YEAR}/colo", "duplicate-local");
+ Assert.assertNull(tree.find("/data/cas/${YEAR}/", regexAlgorithm));
+ Assert.assertNull(tree.find("/data/cas/${YEAR}/colo", regexAlgorithm));
+ Assert.assertNull(tree.find("/data/cas/", regexAlgorithm));
+ Assert.assertTrue(tree.find("/data/cas/2014/", regexAlgorithm).contains("rtbd"));
+ Assert.assertTrue(tree.find("/data/cas/2014/colo", regexAlgorithm).contains("local"));
+ Assert.assertTrue(tree.find("/data/cas/2014/colo", regexAlgorithm).contains("duplicate-local"));
+
+
+ }
+
// Tests for delete method
@Test
public void testDeleteChildOfTerminal() {
[3/3] incubator-falcon git commit: FALCON-914 Add option to search
for Entities. Contributed by Ajay Yadav
Posted by sr...@apache.org.
FALCON-914 Add option to search for Entities. Contributed by Ajay Yadav
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/463f8548
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/463f8548
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/463f8548
Branch: refs/heads/master
Commit: 463f854890d14c0d6b64f00c89827347f58ede52
Parents: 45a7b98
Author: srikanth.sundarrajan <sr...@apache.org>
Authored: Fri Dec 26 10:49:48 2014 +0530
Committer: srikanth.sundarrajan <sr...@apache.org>
Committed: Fri Dec 26 10:49:48 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../java/org/apache/falcon/cli/FalconCLI.java | 7 ++-
.../org/apache/falcon/client/FalconClient.java | 22 ++++----
docs/src/site/twiki/restapi/EntityList.twiki | 3 ++
.../falcon/resource/AbstractEntityManager.java | 48 +++++++++++++++---
.../AbstractSchedulableEntityManager.java | 2 +-
.../proxy/SchedulableEntityManagerProxy.java | 5 +-
.../falcon/resource/EntityManagerTest.java | 53 ++++++++++++++++----
.../resource/SchedulableEntityManager.java | 7 +--
.../java/org/apache/falcon/cli/FalconCLIIT.java | 5 ++
10 files changed, 124 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5575219..e8e82d2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
+ FALCON-914 Add option to search for Entities. (Ajay Yadav via Srikanth
+ Sundarrajan)
+
FALCON-256 Create new API for Process dependency graph DAG which captures
process connected via feeds. (Ajay Yadav via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 5797bbe..ca514c1 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -93,6 +93,7 @@ public class FalconCLI {
public static final String OFFSET_OPT = "offset";
public static final String NUM_RESULTS_OPT = "numResults";
public static final String NUM_INSTANCES_OPT = "numInstances";
+ public static final String PATTERN_OPT = "pattern";
public static final String INSTANCE_CMD = "instance";
public static final String START_OPT = "start";
@@ -374,6 +375,7 @@ public class FalconCLI {
String sortOrder = commandLine.getOptionValue(SORT_ORDER_OPT);
String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
String filterTags = commandLine.getOptionValue(TAGS_OPT);
+ String searchPattern = commandLine.getOptionValue(PATTERN_OPT);
String fields = commandLine.getOptionValue(FIELDS_OPT);
Integer offset = parseIntegerInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
Integer numResults = parseIntegerInput(commandLine.getOptionValue(NUM_RESULTS_OPT),
@@ -437,7 +439,7 @@ public class FalconCLI {
validateOrderBy(orderBy, entityAction);
validateFilterBy(filterBy, entityAction);
EntityList entityList = client.getEntityList(entityType, fields, filterBy,
- filterTags, orderBy, sortOrder, offset, numResults);
+ filterTags, orderBy, sortOrder, offset, numResults, searchPattern);
result = entityList != null ? entityList.toString() : "No entity of type (" + entityType + ") found.";
} else if (optionsList.contains(SUMMARY_OPT)) {
validateEntityTypeForSummary(entityType);
@@ -646,6 +648,8 @@ public class FalconCLI {
Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
Option filterBy = new Option(FILTER_BY_OPT, true,
"Filter returned entities by the specified status");
+ Option searchPattern = new Option(PATTERN_OPT, true,
+ "Filter entities by fuzzy matching with specified pattern");
Option filterTags = new Option(TAGS_OPT, true, "Filter returned entities by the specified tags");
Option orderBy = new Option(ORDER_BY_OPT, true,
"Order returned entities by this field");
@@ -668,6 +672,7 @@ public class FalconCLI {
entityOptions.addOption(end);
entityOptions.addOption(fields);
entityOptions.addOption(filterBy);
+ entityOptions.addOption(searchPattern);
entityOptions.addOption(filterTags);
entityOptions.addOption(orderBy);
entityOptions.addOption(sortOrder);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index a748c58..7f1bc27 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -366,10 +366,10 @@ public class FalconClient {
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public EntityList getEntityList(String entityType, String fields, String filterBy, String filterTags,
- String orderBy, String sortOrder,
- Integer offset, Integer numResults) throws FalconCLIException {
+ String orderBy, String sortOrder, Integer offset,
+ Integer numResults, String searchPattern) throws FalconCLIException {
return sendListRequest(Entities.LIST, entityType, fields, filterBy,
- filterTags, orderBy, sortOrder, offset, numResults);
+ filterTags, orderBy, sortOrder, offset, numResults, searchPattern);
}
public EntitySummaryResult getEntitySummary(String entityType, String cluster, String start, String end,
@@ -589,8 +589,8 @@ public class FalconClient {
private WebResource addParamsToResource(WebResource resource,
String start, String end, String runId, String colo,
String fields, String filterBy, String tags,
- String orderBy, String sortOrder,
- Integer offset, Integer numResults, Integer numInstances) {
+ String orderBy, String sortOrder, Integer offset,
+ Integer numResults, Integer numInstances, String searchPattern) {
if (!StringUtils.isEmpty(fields)) {
resource = resource.queryParam("fields", fields);
@@ -628,6 +628,10 @@ public class FalconClient {
if (numInstances != null) {
resource = resource.queryParam("numInstances", numInstances.toString());
}
+
+ if (!StringUtils.isEmpty(searchPattern)) {
+ resource = resource.queryParam("pattern", searchPattern);
+ }
return resource;
}
@@ -645,7 +649,7 @@ public class FalconClient {
resource = addParamsToResource(resource, start, end, null, null,
fields, filterBy, filterTags,
orderBy, sortOrder,
- offset, numResults, numInstances);
+ offset, numResults, numInstances, null);
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
@@ -725,7 +729,7 @@ public class FalconClient {
.path(entity);
resource = addParamsToResource(resource, start, end, runid, colo,
- null, filterBy, null, orderBy, sortOrder, offset, numResults, null);
+ null, filterBy, null, orderBy, sortOrder, offset, numResults, null, null);
if (lifeCycles != null) {
checkLifeCycleOption(lifeCycles, type);
@@ -778,11 +782,11 @@ public class FalconClient {
//SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
private EntityList sendListRequest(Entities entities, String entityType, String fields, String filterBy,
String filterTags, String orderBy, String sortOrder, Integer offset,
- Integer numResults) throws FalconCLIException {
+ Integer numResults, String searchPattern) throws FalconCLIException {
WebResource resource = service.path(entities.path)
.path(entityType);
resource = addParamsToResource(resource, null, null, null, null, fields, filterBy, filterTags,
- orderBy, sortOrder, offset, numResults, null);
+ orderBy, sortOrder, offset, numResults, null, searchPattern);
ClientResponse clientResponse = resource
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/docs/src/site/twiki/restapi/EntityList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityList.twiki b/docs/src/site/twiki/restapi/EntityList.twiki
index b569ade..5e11691 100644
--- a/docs/src/site/twiki/restapi/EntityList.twiki
+++ b/docs/src/site/twiki/restapi/EntityList.twiki
@@ -11,6 +11,9 @@ Get list of the entities.
* :entity-type Valid options are cluster, feed or process.
* fields <optional param> Fields of entity that the user wants to view, separated by commas.
* Valid options are STATUS, TAGS, PIPELINES.
+ * pattern <optional param> Find string which contains this sequence of characters. Example: pattern=abc
+ * matching is case insensitive.
+ * For example a pattern mhs will match a process named New-My-Hourly-Summary.
* filterBy <optional param> Filter results by list of field:value pairs. Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs
* Supported filter fields are NAME, STATUS, PIPELINES, CLUSTER.
* Query will do an AND among filterBy fields.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 8ec9e2d..4a686e7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -32,7 +32,10 @@ import org.apache.falcon.entity.parser.EntityParserFactory;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.store.EntityAlreadyExistsException;
-import org.apache.falcon.entity.v0.*;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityGraph;
+import org.apache.falcon.entity.v0.EntityIntegrityChecker;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.resource.APIResult.Status;
import org.apache.falcon.resource.EntityList.EntityElement;
@@ -51,7 +54,18 @@ import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
/**
* A base class for managing Entity operations.
@@ -524,13 +538,15 @@ public abstract class AbstractEntityManager {
* @return EntityList
*/
public EntityList getEntityList(String type, String fieldStr, String filterBy, String filterTags,
- String orderBy, String sortOrder, Integer offset, Integer resultsPerPage) {
+ String orderBy, String sortOrder, Integer offset, Integer resultsPerPage,
+ String pattern) {
HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toLowerCase().split(",")));
validateEntityFilterByClause(filterBy);
List<Entity> entities;
try {
- entities = getEntities(type, "", "", "", filterBy, filterTags, orderBy, sortOrder, offset, resultsPerPage);
+ entities = getEntities(type, "", "", "", filterBy, filterTags, orderBy, sortOrder, offset,
+ resultsPerPage, pattern);
} catch (Exception e) {
LOG.error("Failed to get entity list", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
@@ -554,8 +570,8 @@ public abstract class AbstractEntityManager {
}
protected List<Entity> getEntities(String type, String startDate, String endDate, String cluster,
- String filterBy, String filterTags, String orderBy, String sortOrder,
- int offset, int resultsPerPage) throws FalconException, IOException {
+ String filterBy, String filterTags, String orderBy, String sortOrder, int offset,
+ int resultsPerPage, String pattern) throws FalconException, IOException {
final Map<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
final List<String> filterByTags = getFilterByTags(filterTags);
@@ -593,6 +609,10 @@ public abstract class AbstractEntityManager {
filterByFieldsValues, filterByTags, tags, pipelines)) {
continue;
}
+
+ if (StringUtils.isNotBlank(pattern) && !fuzzySearch(entity.getName(), pattern)) {
+ continue;
+ }
entities.add(entity);
}
// Sort entities before returning a subset of entity elements.
@@ -607,6 +627,22 @@ public abstract class AbstractEntityManager {
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
+ boolean fuzzySearch(String enityName, String pattern) {
+ int currentIndex = 0; // current index in pattern which is to be matched
+ char[] searchPattern = pattern.toLowerCase().toCharArray();
+ String name = enityName.toLowerCase();
+
+ for (Character c : name.toCharArray()) {
+ if (currentIndex < searchPattern.length && c == searchPattern[currentIndex]) {
+ currentIndex++;
+ }
+ if (currentIndex == searchPattern.length) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private boolean filterEntityByDatesAndCluster(Entity entity, String startDate, String endDate, String cluster)
throws FalconException {
if (StringUtils.isEmpty(cluster)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index d994e25..a4d1f8b 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -185,7 +185,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
entities = getEntities(type,
SchemaHelper.getDateFormat().format(startAndEndDates.first),
SchemaHelper.getDateFormat().format(startAndEndDates.second),
- cluster, filterBy, filterTags, orderBy, sortOrder, offset, resultsPerPage);
+ cluster, filterBy, filterTags, orderBy, sortOrder, offset, resultsPerPage, "");
colo = ((Cluster) configStore.get(EntityType.CLUSTER, cluster)).getColo();
} catch (Exception e) {
LOG.error("Failed to get entities", e);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 075cb64..cfa70a0 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -399,8 +399,9 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@DefaultValue("asc") @QueryParam("sortOrder") String sortOrder,
@DefaultValue("0") @QueryParam("offset") Integer offset,
@DefaultValue(DEFAULT_NUM_RESULTS)
- @QueryParam("numResults") Integer resultsPerPage) {
- return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage);
+ @QueryParam("numResults") Integer resultsPerPage,
+ @QueryParam("pattern") String pattern) {
+ return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage, pattern);
}
@GET
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index 1862e39..470f866 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.falcon.resource;
import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -125,7 +126,7 @@ public class EntityManagerTest extends AbstractEntityManager {
* Only one entity should be returned when the auth is enabled.
*/
try {
- getEntityList("process", "", "", "", "", "", 0, 10);
+ getEntityList("process", "", "", "", "", "", 0, 10, "");
Assert.fail();
} catch (Throwable ignore) {
// do nothing
@@ -142,7 +143,7 @@ public class EntityManagerTest extends AbstractEntityManager {
Entity process2 = buildProcess("processAuthUser", System.getProperty("user.name"), "", "");
configStore.publish(EntityType.PROCESS, process2);
- EntityList entityList = this.getEntityList("process", "", "", "", "", "asc", 0, 10);
+ EntityList entityList = this.getEntityList("process", "", "", "", "", "asc", 0, 10, "");
Assert.assertNotNull(entityList.getElements());
Assert.assertEquals(entityList.getElements().length, 1);
@@ -151,7 +152,7 @@ public class EntityManagerTest extends AbstractEntityManager {
*/
StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
CurrentUser.authenticate(System.getProperty("user.name"));
- entityList = this.getEntityList("process", "", "", "", "", "desc", 0, 10);
+ entityList = this.getEntityList("process", "", "", "", "", "desc", 0, 10, "");
Assert.assertNotNull(entityList.getElements());
Assert.assertEquals(entityList.getElements().length, 1);
@@ -188,7 +189,7 @@ public class EntityManagerTest extends AbstractEntityManager {
configStore.publish(EntityType.PROCESS, process4);
EntityList entityList = this.getEntityList("process", "tags", "PIPELINES:dataReplicationPipeline",
- "", "name", "desc", 1, 1);
+ "", "name", "desc", 1, 1, "");
Assert.assertNotNull(entityList.getElements());
Assert.assertEquals(entityList.getElements().length, 1);
Assert.assertEquals(entityList.getElements()[0].name, "process1");
@@ -198,7 +199,7 @@ public class EntityManagerTest extends AbstractEntityManager {
entityList = this.getEntityList("process", "pipelines", "",
- "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 0, 2);
+ "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 0, 2, "");
Assert.assertNotNull(entityList.getElements());
Assert.assertEquals(entityList.getElements().length, 2);
Assert.assertEquals(entityList.getElements()[1].name, "process2");
@@ -207,17 +208,17 @@ public class EntityManagerTest extends AbstractEntityManager {
Assert.assertEquals(entityList.getElements()[0].tag, null);
entityList = this.getEntityList("process", "pipelines", "",
- "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, 2);
+ "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, 2, "");
Assert.assertEquals(entityList.getElements().length, 0);
entityList = this.getEntityList("process", "pipelines", "",
- "owner=producer@xyz.com", "name", "", 1, 2);
+ "owner=producer@xyz.com", "name", "", 1, 2, "");
Assert.assertEquals(entityList.getElements().length, 2);
// Test negative value for numResults, should throw an exception.
try {
this.getEntityList("process", "pipelines", "",
- "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, -1);
+ "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "", 10, -1, "");
Assert.assertTrue(false);
} catch (Throwable e) {
Assert.assertTrue(true);
@@ -226,13 +227,47 @@ public class EntityManagerTest extends AbstractEntityManager {
// Test invalid entry for sortOrder
try {
this.getEntityList("process", "pipelines", "",
- "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "invalid", 10, 2);
+ "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", "invalid", 10, 2, "");
Assert.assertTrue(false);
} catch (Throwable e) {
Assert.assertTrue(true);
}
}
+ @Test
+ public void testSearch() throws FalconException {
+ Assert.assertTrue(fuzzySearch("My-Hourly-Summary", "mhs"));
+ Assert.assertTrue(fuzzySearch("New-My-Hourly-Summary", "MHs"));
+ Assert.assertFalse(fuzzySearch("My-Hourly-Summary", "moh"));
+ }
+
+ @Test
+ public void testGetEntityListWithPattern() throws FalconException {
+ String user = System.getProperty("user.name");
+
+ Entity process1 = buildProcess("New-My-Hourly-Summary", user,
+ "consumer=consumer@xyz.com, owner=producer@xyz.com",
+ "testPipeline,dataReplicationPipeline");
+ configStore.publish(EntityType.PROCESS, process1);
+
+ Entity process2 = buildProcess("Random-Summary-Generator", user,
+ "consumer=consumer@xyz.com, owner=producer@xyz.com",
+ "testPipeline,dataReplicationPipeline");
+ configStore.publish(EntityType.PROCESS, process2);
+
+ Entity process3 = buildProcess("My-Hourly-Summary", user, "", "testPipeline");
+ configStore.publish(EntityType.PROCESS, process3);
+
+ Entity process4 = buildProcess("sample-process4", user, "owner=producer@xyz.com", "");
+ configStore.publish(EntityType.PROCESS, process4);
+
+ EntityList entityList = this.getEntityList("process", "tags", "PIPELINES:dataReplicationPipeline",
+ "", "name", "desc", 0, 10, "mhs");
+ Assert.assertNotNull(entityList.getElements());
+ Assert.assertEquals(entityList.getElements().length, 1);
+
+ }
+
private Entity buildProcess(String name, String username, String tags, String pipelines) {
ACL acl = new ACL();
acl.setOwner(username);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 5f4f495..d8d8bfc 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -57,7 +57,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
@GET
@Path("list/{type}")
@Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
- @Monitored(event = "dependencies")
+ @Monitored(event = "list")
@Override
public EntityList getEntityList(@Dimension("type") @PathParam("type") String type,
@DefaultValue("") @QueryParam("fields") String fields,
@@ -67,8 +67,9 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
@DefaultValue("asc") @QueryParam("sortOrder") String sortOrder,
@DefaultValue("0") @QueryParam("offset") Integer offset,
@DefaultValue(DEFAULT_NUM_RESULTS)
- @QueryParam("numResults") Integer resultsPerPage) {
- return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage);
+ @QueryParam("numResults") Integer resultsPerPage,
+ @QueryParam("pattern") String pattern) {
+ return super.getEntityList(type, fields, filterBy, tags, orderBy, sortOrder, offset, resultsPerPage, pattern);
}
@GET
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/463f8548/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index b50999d..118003f 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -547,6 +547,11 @@ public class FalconCLIIT {
// test entity List cli
Assert.assertEquals(executeWithURL("entity -list -type cluster" + " -offset 0 -numResults 1"), 0);
+
+ Assert.assertEquals(executeWithURL("entity -list -type process -fields status "
+ + " -filterBy STATUS:SUBMITTED,TYPE:process -orderBy name "
+ + " -sortOrder asc -offset 1 -numResults 1 -pattern abc"), 0);
+
Assert.assertEquals(executeWithURL("entity -list -type process -fields status "
+ " -filterBy STATUS:SUBMITTED,TYPE:process -orderBy name "
+ " -sortOrder asc -offset 1 -numResults 1"), 0);
[2/3] incubator-falcon git commit: FALCON-256 Create new API for
Process dependency graph DAG which captures process connected via feeds.
Contributed by Ajay Yadav
Posted by sr...@apache.org.
FALCON-256 Create new API for Process dependency graph DAG which captures process connected via feeds. Contributed by Ajay Yadav
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/45a7b989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/45a7b989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/45a7b989
Branch: refs/heads/master
Commit: 45a7b989bfdad6943dc0090c7cea2e098862c9a9
Parents: 06ffdf9
Author: srikanth.sundarrajan <sr...@apache.org>
Authored: Fri Dec 26 10:35:32 2014 +0530
Committer: srikanth.sundarrajan <sr...@apache.org>
Committed: Fri Dec 26 10:35:32 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/falcon/cli/FalconMetadataCLI.java | 17 +-
.../org/apache/falcon/client/FalconClient.java | 17 +-
.../falcon/resource/LineageGraphResult.java | 165 +++++++++++++++++++
docs/src/site/twiki/FalconCLI.twiki | 17 ++
docs/src/site/twiki/restapi/EntityLineage.twiki | 39 +++++
docs/src/site/twiki/restapi/ResourceList.twiki | 1 +
pom.xml | 6 +
prism/pom.xml | 7 +
.../metadata/LineageMetadataResource.java | 102 ++++++++++++
.../metadata/LineageMetadataResourceTest.java | 8 +
.../resource/metadata/MetadataTestContext.java | 18 ++
.../java/org/apache/falcon/cli/FalconCLIIT.java | 24 +++
13 files changed, 422 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af4bd9e..5575219 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,9 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
+ FALCON-256 Create new API for Process dependency graph DAG which captures
+ process connected via feeds. (Ajay Yadav via Srikanth Sundarrajan)
+
FALCON-823 Add path matching ability to the radix tree (Ajay Yadav
via Srikanth Sundarrajan)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
index 63af415..515d328 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -54,6 +54,8 @@ public class FalconMetadataCLI {
public static final String VERTEX_CMD = "vertex";
public static final String VERTICES_CMD = "vertices";
public static final String VERTEX_EDGES_CMD = "edges";
+ public static final String PIPELINE_OPT = "pipeline";
+
public static final String EDGE_CMD = "edge";
public static final String ID_OPT = "id";
@@ -78,8 +80,12 @@ public class FalconMetadataCLI {
String key = commandLine.getOptionValue(KEY_OPT);
String value = commandLine.getOptionValue(VALUE_OPT);
String direction = commandLine.getOptionValue(DIRECTION_OPT);
+ String pipeline = commandLine.getOptionValue(PIPELINE_OPT);
- if (optionsList.contains(LIST_OPT)) {
+ if (optionsList.contains(LINEAGE_OPT)) {
+ validatePipelineName(pipeline);
+ result = client.getEntityLineageGraph(pipeline).getDotNotation();
+ } else if (optionsList.contains(LIST_OPT)) {
validateDimensionType(dimensionType.toUpperCase());
result = client.getDimensionList(dimensionType, cluster);
} else if (optionsList.contains(RELATIONS_OPT)) {
@@ -105,6 +111,12 @@ public class FalconMetadataCLI {
OUT.get().println(result);
}
+ private void validatePipelineName(String pipeline) throws FalconCLIException {
+ if (StringUtils.isEmpty(pipeline)) {
+ throw new FalconCLIException("Invalid value for pipeline");
+ }
+ }
+
private void validateDimensionType(String dimensionType) throws FalconCLIException {
if (StringUtils.isEmpty(dimensionType)
|| dimensionType.contains("INSTANCE")) {
@@ -157,6 +169,8 @@ public class FalconMetadataCLI {
Option lineage = new Option(LINEAGE_OPT, false, "Get falcon metadata lineage information");
group.addOption(discovery);
group.addOption(lineage);
+ Option pipeline = new Option(PIPELINE_OPT, true,
+ "Get lineage graph for the entities in a pipeline");
metadataOptions.addOptionGroup(group);
// Add discovery options
@@ -172,6 +186,7 @@ public class FalconMetadataCLI {
Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
// Add lineage options
+ metadataOptions.addOption(pipeline);
metadataOptions.addOption(url);
metadataOptions.addOption(type);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 5c476ae..a748c58 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -38,6 +38,7 @@ import org.apache.falcon.resource.EntitySummaryResult;
import org.apache.falcon.resource.FeedInstanceResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
+import org.apache.falcon.resource.LineageGraphResult;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
@@ -210,7 +211,8 @@ public class FalconClient {
LIST("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
RELATIONS("api/metadata/discovery/", HttpMethod.GET, MediaType.APPLICATION_JSON),
VERTICES("api/metadata/lineage/vertices", HttpMethod.GET, MediaType.APPLICATION_JSON),
- EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON);
+ EDGES("api/metadata/lineage/edges", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ LINEAGE("api/metadata/lineage/entities", HttpMethod.GET, MediaType.APPLICATION_JSON);
private String path;
private String method;
@@ -507,6 +509,19 @@ public class FalconClient {
return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster);
}
+ public LineageGraphResult getEntityLineageGraph(String pipelineName) throws FalconCLIException {
+ MetadataOperations operation = MetadataOperations.LINEAGE;
+ WebResource resource = service.path(operation.path)
+ .queryParam(FalconMetadataCLI.PIPELINE_OPT, pipelineName);
+
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(operation.mimeType).type(operation.mimeType)
+ .method(operation.method, ClientResponse.class);
+ checkIfSuccessful(clientResponse);
+ return clientResponse.getEntity(LineageGraphResult.class);
+ }
+
public String getDimensionRelations(String dimensionType, String dimensionName) throws FalconCLIException {
return sendMetadataDiscoveryRequest(MetadataOperations.RELATIONS, dimensionType, dimensionName, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
new file mode 100644
index 0000000..acf5d11
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/resource/LineageGraphResult.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import org.apache.commons.lang.StringUtils;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * LineageGraphResult is the output returned by all the apis returning a DAG.
+ */
+@XmlRootElement(name = "result")
+@XmlAccessorType (XmlAccessType.FIELD)
+@edu.umd.cs.findbugs.annotations.SuppressWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class LineageGraphResult {
+
+ private String[] vertices;
+
+ @XmlElement(name="edges")
+ private Edge[] edges;
+
+ private static final JAXBContext JAXB_CONTEXT;
+
+ static {
+ try {
+ JAXB_CONTEXT = JAXBContext.newInstance(LineageGraphResult.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public LineageGraphResult() {
+ // default constructor for JAXB
+ }
+
+ /**
+ * A class to represent an edge in a DAG.
+ */
+ @XmlRootElement(name = "edge")
+ @XmlAccessorType(XmlAccessType.FIELD)
+ public static class Edge {
+ @XmlElement
+ private String from;
+ @XmlElement
+ private String to;
+ @XmlElement
+ private String label;
+
+ public Edge() {
+
+ }
+
+ public Edge(String from, String to, String label) {
+ this.from = from;
+ this.to = to;
+ this.label = label;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public String getTo() {
+ return to;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public String getDotNotation() {
+ StringBuilder result = new StringBuilder();
+ if (StringUtils.isNotBlank(this.from) && StringUtils.isNotBlank(this.to)
+ && StringUtils.isNotBlank(this.label)) {
+ result.append("\"" + this.from +"\"");
+ result.append(" -> ");
+ result.append("\"" + this.to + "\"");
+ result.append(" [ label = \"" + this.label + "\" ] \n");
+ }
+ return result.toString();
+ }
+
+ @Override
+ public String toString() {
+ return getDotNotation();
+ }
+
+ }
+
+
+ public String getDotNotation() {
+ StringBuilder result = new StringBuilder();
+ result.append("digraph g{ \n");
+ if (this.vertices != null) {
+ for (String v : this.vertices) {
+ result.append("\"" + v + "\"");
+ result.append("\n");
+ }
+ }
+
+ if (this.edges != null) {
+ for (Edge e : this.edges) {
+ result.append(e.getDotNotation());
+ }
+ }
+ result.append("}\n");
+ return result.toString();
+ }
+
+ public String[] getVertices() {
+ return vertices;
+ }
+
+ public void setVertices(String[] vertices) {
+ this.vertices = vertices;
+ }
+
+ public Edge[] getEdges() {
+ return edges;
+ }
+
+ public void setEdges(Edge[] edges) {
+ this.edges = edges;
+ }
+
+
+ @Override
+ public String toString() {
+ return getDotNotation();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index d8199dd..d37cf8c 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -56,6 +56,9 @@ Optional Args : -fields <<field1,field2>> -filterBy <<field1:value1,field2:value
<a href="./Restapi/EntityList.html">Optional params described here.</a>
+
+
+
---+++Summary
Summary of entities of a particular type and a cluster will be listed. Entity summary has N most recent instances of entity.
@@ -255,6 +258,20 @@ $FALCON_HOME/bin/falcon instance -type <<feed/process>> -name <<name>> -params -
---++ Metadata Lineage Options
+---+++Lineage
+
+Returns the relationship between processes and feeds in a given pipeline in <a href="http://www.graphviz.org/content/dot-language">dot</a> format.
+You can use the output and view a graphical representation of DAG using an online graphviz viewer like <a href="http://graphviz-dev.appspot.com/">this</a>.
+
+
+Usage:
+
+$FALCON_HOME/bin/falcon metadata -lineage -pipeline my-pipeline
+
+pipeline is a mandatory option.
+
+
+
---+++ Vertex
Get the vertex with the specified id.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/EntityLineage.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityLineage.twiki b/docs/src/site/twiki/restapi/EntityLineage.twiki
new file mode 100644
index 0000000..ea747b1
--- /dev/null
+++ b/docs/src/site/twiki/restapi/EntityLineage.twiki
@@ -0,0 +1,39 @@
+---++ GET api/metadata/lineage/entities?pipeline=:pipeline
+ * <a href="#Description">Description</a>
+ * <a href="#Parameters">Parameters</a>
+ * <a href="#Results">Results</a>
+ * <a href="#Examples">Examples</a>
+
+---++ Description
+It returns the graph depicting the relationship between the various processes and feeds in a given pipeline.
+
+---++ Parameters
+ * :pipeline is the name of the pipeline
+
+---++ Results
+It returns a json graph
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/metadata/lineage/entities?pipeline=my-pipeline
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "vertices": ["my-minutely-process", "my-hourly-process"],
+ "edges":
+ [
+ {
+ "from" : "my-minutely-process",
+ "to" : "my-hourly-process",
+ "label" : "my-minutely-feed"
+ },
+ {
+ "from" : "my-hourly-process",
+ "to" : "my-minutely-process",
+ "label" : "my-hourly-feedback"
+ }
+ ]
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index a87818b..2368631 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -77,6 +77,7 @@ See also: [[../Security.twiki][Security in Falcon]]
| GET | [[AdjacentVertices][api/metadata/lineage/vertices/:id/:direction]] | get the adjacent vertices or edges of the vertex with the specified direction |
| GET | [[AllEdges][api/metadata/lineage/edges/all]] | get all edges |
| GET | [[Edge][api/metadata/lineage/edges/:id]] | get the edge with the specified id |
+| GET | [[EntityLineage][api/metadata/lineage/entities?pipeline=:name]] | Get lineage graph for processes and feeds in the specified pipeline |
---++ REST Call on Metadata Discovery Resource
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5a6c095..1b3a6c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -625,6 +625,12 @@
</dependency>
<dependency>
+ <groupId>com.tinkerpop.gremlin</groupId>
+ <artifactId>gremlin-java</artifactId>
+ <version>2.6.0</version>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>3.0.3.RELEASE</version>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 26e577d..43cc4b4 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -61,6 +61,13 @@
</dependency>
<dependency>
+ <groupId>com.tinkerpop.gremlin</groupId>
+ <artifactId>gremlin-java</artifactId>
+ </dependency>
+
+
+
+ <dependency>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-test-util</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
index 2404be4..0c6b2b6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -18,6 +18,7 @@
package org.apache.falcon.resource.metadata;
+import com.google.common.collect.Sets;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Element;
@@ -25,12 +26,22 @@ import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.VertexQuery;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
+import com.tinkerpop.gremlin.java.GremlinPipeline;
import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.metadata.GraphUtils;
import org.apache.falcon.metadata.RelationshipLabel;
import org.apache.falcon.metadata.RelationshipProperty;
import org.apache.falcon.metadata.RelationshipType;
+import org.apache.falcon.monitors.Dimension;
+import org.apache.falcon.monitors.Monitored;
+import org.apache.falcon.resource.LineageGraphResult;
import org.apache.falcon.util.StartupProperties;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
@@ -48,7 +59,10 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Jersey Resource for lineage metadata operations.
@@ -81,6 +95,37 @@ public class LineageMetadataResource extends AbstractMetadataResource {
}
}
+
+ @GET
+ @Path("/entities")
+ @Produces({MediaType.APPLICATION_JSON})
+ @Monitored(event = "entity-lineage")
+ public Response getEntityLineageGraph(@Dimension("pipeline") @QueryParam("pipeline") final String pipeline) {
+ LOG.info("Get lineage Graph for pipeline:({})", pipeline);
+
+ try {
+ Iterable<Vertex> processes;
+ if (StringUtils.isNotBlank(pipeline)) {
+ Iterable<Vertex> pipelineNode = getGraph().getVertices(RelationshipProperty.NAME.getName(),
+ pipeline);
+ if (!pipelineNode.iterator().hasNext()) {
+ throw FalconWebException.newException("No pipelines found for " + pipeline,
+ Response.Status.BAD_REQUEST);
+ }
+ Vertex v = pipelineNode.iterator().next(); // pipeline names are unique
+ processes = new GremlinPipeline(v).in(RelationshipLabel.PIPELINES.getName())
+ .has(RelationshipProperty.TYPE.getName(), RelationshipType.PROCESS_ENTITY.getName());
+ return Response.ok(buildJSONGraph(processes)).build();
+ }
+ throw FalconWebException.newException("Pipeline name can not be blank",
+ Response.Status.INTERNAL_SERVER_ERROR);
+
+ } catch (Exception e) {
+ LOG.error("Error while fetching entity lineage: ", e);
+ throw FalconWebException.newException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
/**
* Get all vertices.
*
@@ -392,6 +437,63 @@ public class LineageMetadataResource extends AbstractMetadataResource {
return response;
}
+ private LineageGraphResult buildJSONGraph(Iterable<Vertex> processes) throws FalconException {
+ LineageGraphResult result = new LineageGraphResult();
+
+ List<String> vertexArray = new LinkedList<String>();
+ List<LineageGraphResult.Edge> edgeArray = new LinkedList<LineageGraphResult.Edge>();
+
+ Map<String, String> feedProducerMap = new HashMap<String, String>();
+ Map<String, List<String>> feedConsumerMap = new HashMap<String, List<String>>();
+
+ if (processes != null) {
+ for (Vertex process : processes) {
+ String processName = process.getProperty(RelationshipProperty.NAME.getName());
+ vertexArray.add(processName);
+ Process producer = ConfigurationStore.get().get(EntityType.PROCESS, processName);
+
+ if (producer != null) {
+ if (producer.getOutputs() != null) {
+ //put all produced feeds in feedProducerMap
+ for (Output output : producer.getOutputs().getOutputs()) {
+ feedProducerMap.put(output.getFeed(), processName);
+ }
+ }
+ if (producer.getInputs() != null) {
+ //put all consumed feeds in feedConsumerMap
+ for (Input input : producer.getInputs().getInputs()) {
+ //if feed already exists then append it, else insert it with a list
+ if (feedConsumerMap.containsKey(input.getFeed())) {
+ feedConsumerMap.get(input.getFeed()).add(processName);
+ } else {
+ List<String> value = new LinkedList<String>();
+ value.add(processName);
+ feedConsumerMap.put(input.getFeed(), value);
+ }
+ }
+ }
+ }
+ }
+ LOG.debug("feedProducerMap = {}", feedProducerMap);
+ LOG.debug("feedConsumerMap = {}", feedConsumerMap);
+
+ // discard feeds which aren't edges between two processes
+ Set<String> pipelineFeeds = Sets.intersection(feedProducerMap.keySet(), feedConsumerMap.keySet());
+ for (String feedName : pipelineFeeds) {
+ String producerProcess = feedProducerMap.get(feedName);
+ // make an edge from producer to all the consumers
+ for (String consumerProcess : feedConsumerMap.get(feedName)) {
+ edgeArray.add(new LineageGraphResult.Edge(producerProcess, consumerProcess, feedName));
+ }
+ }
+ }
+
+ result.setEdges(edgeArray.toArray(new LineageGraphResult.Edge[edgeArray.size()]));
+ result.setVertices(vertexArray.toArray(new String[vertexArray.size()]));
+ LOG.debug("result = {}", result);
+ return result;
+ }
+
private static void validateInputs(String errorMsg, String... inputs) {
for (String input : inputs) {
if (StringUtils.isEmpty(input)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index cabb44c..ac0e51f 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -351,6 +351,14 @@ public class LineageMetadataResourceTest {
}
}
+ @Test
+ public void testEntityLineage() throws Exception {
+ testContext.addConsumerProcess();
+ LineageMetadataResource resource = new LineageMetadataResource();
+ Response response = resource.getEntityLineageGraph("testPipeline");
+ Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+ }
+
private void assertBasicVertexProperties(Vertex vertex, Map vertexProperties) {
RelationshipProperty[] properties = {
RelationshipProperty.NAME,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
index aaddf62..6f798a8 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
@@ -51,6 +51,7 @@ public class MetadataTestContext {
public static final String OPERATION = "GENERATE";
public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+ public static final String CHILD_PROCESS_ENTITY_NAME = "sample-child-process";
public static final String PROCESS_ENTITY_NAME = "sample-process";
public static final String COLO_NAME = "west-coast";
public static final String WORKFLOW_NAME = "imp-click-join-workflow";
@@ -171,6 +172,23 @@ public class MetadataTestContext {
configStore.publish(EntityType.PROCESS, processEntity);
}
+ public void addConsumerProcess() throws Exception {
+ org.apache.falcon.entity.v0.process.Process processEntity =
+ EntityBuilderTestUtil.buildProcess(CHILD_PROCESS_ENTITY_NAME,
+ clusterEntity, "classified-as=Critical", "testPipeline");
+ EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+
+ for (Feed inputFeed : inputFeeds) {
+ EntityBuilderTestUtil.addOutput(processEntity, inputFeed);
+ }
+
+ for (Feed outputFeed : outputFeeds) {
+ EntityBuilderTestUtil.addInput(processEntity, outputFeed);
+ }
+
+ configStore.publish(EntityType.PROCESS, processEntity);
+ }
+
public void addInstance() throws Exception {
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
WorkflowExecutionContext.Type.POST_PROCESSING);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/45a7b989/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 9c6ad80..b50999d 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -487,6 +487,30 @@ public class FalconCLIIT {
+ " -file " + createTempJobPropertiesFile()), 0);
}
+
+ @Test
+ public void testEntityLineage() throws Exception {
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ String filePath;
+ filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ context.setCluster(overlay.get("cluster"));
+ Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), 0);
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(executeWithURL("entity -submit -type process -file " + filePath), 0);
+
+ Assert.assertEquals(executeWithURL("metadata -lineage -pipeline testPipeline"), 0);
+
+ }
+
@Test
public void testEntityPaginationFilterByCommands() throws Exception {