You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/06/02 18:07:33 UTC
[2/2] nifi git commit: NIFI-4009: Added support for several key
functions in RecordPath
NIFI-4009: Added support for several key functions in RecordPath
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1881
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32314d70
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32314d70
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32314d70
Branch: refs/heads/master
Commit: 32314d70fdea6e87fa38729956fa24b3298f5727
Parents: 0bddcfe
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 31 12:17:28 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Fri Jun 2 14:06:05 2017 -0400
----------------------------------------------------------------------
.../apache/nifi/record/path/RecordPathLexer.g | 19 +-
.../apache/nifi/record/path/RecordPathParser.g | 58 +++-
.../org/apache/nifi/record/path/RecordPath.java | 3 +-
.../path/filter/BinaryOperatorFilter.java | 7 +-
.../nifi/record/path/filter/Contains.java | 32 ++
.../nifi/record/path/filter/ContainsRegex.java | 77 +++++
.../nifi/record/path/filter/EndsWith.java | 33 ++
.../nifi/record/path/filter/EqualsFilter.java | 4 +-
.../nifi/record/path/filter/FunctionFilter.java | 40 +++
.../apache/nifi/record/path/filter/IsBlank.java | 41 +++
.../apache/nifi/record/path/filter/IsEmpty.java | 40 +++
.../nifi/record/path/filter/MatchesRegex.java | 77 +++++
.../record/path/filter/NotEqualsFilter.java | 2 +-
.../nifi/record/path/filter/NotFilter.java | 38 +++
.../record/path/filter/RecordPathFilter.java | 2 +-
.../nifi/record/path/filter/StartsWith.java | 33 ++
.../path/filter/StringComparisonFilter.java | 57 ++++
.../nifi/record/path/functions/Concat.java | 55 ++++
.../nifi/record/path/functions/Replace.java | 64 ++++
.../nifi/record/path/functions/ReplaceNull.java | 64 ++++
.../record/path/functions/ReplaceRegex.java | 93 ++++++
.../nifi/record/path/functions/Substring.java | 96 ++++++
.../record/path/functions/SubstringAfter.java | 65 ++++
.../path/functions/SubstringAfterLast.java | 65 ++++
.../record/path/functions/SubstringBefore.java | 61 ++++
.../path/functions/SubstringBeforeLast.java | 61 ++++
.../nifi/record/path/paths/PredicatePath.java | 11 +-
.../record/path/paths/RecordPathCompiler.java | 139 +++++++++
.../record/path/paths/RecordPathSegment.java | 33 +-
.../nifi/record/path/util/RecordPathUtils.java | 42 +++
.../apache/nifi/record/path/TestRecordPath.java | 304 ++++++++++++++++++-
.../src/main/asciidoc/record-path-guide.adoc | 252 +++++++++++++++
.../nifi/processors/standard/UpdateRecord.java | 2 +-
33 files changed, 1912 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
index 6240e93..cd466f7 100644
--- a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
+++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
@@ -69,6 +69,8 @@ CHILD_SEPARATOR : '/';
DESCENDANT_SEPARATOR : '//';
LBRACKET : '[';
RBRACKET : ']';
+LPAREN : '(';
+RPAREN : ')';
NUMBER : '-'? ('0'..'9')+;
QUOTE : '\'';
COMMA : ',';
@@ -92,9 +94,20 @@ WHITESPACE : SPACE+ { skip(); };
fragment SPACE : ' ' | '\t' | '\n' | '\r' | '\u000C';
-RAW_FIELD_NAME : (
- ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>')
- ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ')*
+// filter functions
+CONTAINS : 'contains';
+CONTAINS_REGEX : 'containsRegex';
+ENDS_WITH : 'endsWith';
+STARTS_WITH : 'startsWith';
+IS_BLANK : 'isBlank';
+IS_EMPTY : 'isEmpty';
+MATCHES_REGEX : 'matchesRegex';
+NOT : 'not';
+
+
+IDENTIFIER : (
+ ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>' | '(' | ')' )
+ ~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ' | '(' | ')' )*
);
// STRINGS
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
index e9bad38..5e406cb 100644
--- a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
+++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
@@ -38,6 +38,8 @@ tokens {
PREDICATE;
OPERATOR;
RELATIVE_PATH;
+ FUNCTION;
+ ARGUMENTS;
}
@header {
@@ -90,7 +92,7 @@ multipleStringLiterals : STRING_LITERAL (COMMA! STRING_LITERAL)*;
stringList : multipleStringLiterals ->
^(STRING_LIST multipleStringLiterals);
-rawOrLiteral : RAW_FIELD_NAME | STRING_LITERAL;
+rawOrLiteral : IDENTIFIER | STRING_LITERAL;
@@ -118,6 +120,7 @@ index : LBRACKET! indexOrKey RBRACKET!;
+
//
// Predicates
//
@@ -125,14 +128,53 @@ operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN | GREATER_THAN_EQUAL | EQU
literal : NUMBER | STRING_LITERAL;
-expression : path | literal;
+expression : path | literal | function;
+
+operation : expression operator^ expression;
+
+filter : filterFunction | operation;
+
+predicate : LBRACKET filter RBRACKET ->
+ ^(PREDICATE filter);
+
+
+//
+// Functions
+//
+
+argument : expression;
+
+optionalArgument : argument?;
-operation : relativePath operator^ expression;
+argumentList : optionalArgument (COMMA argument)* ->
+ ^(ARGUMENTS optionalArgument argument*);
-predicate : LBRACKET operation RBRACKET ->
- ^(PREDICATE operation);
+function : IDENTIFIER LPAREN argumentList RPAREN ->
+ ^(FUNCTION IDENTIFIER argumentList);
+filterFunctionNames : CONTAINS | CONTAINS_REGEX | ENDS_WITH | STARTS_WITH | IS_BLANK | IS_EMPTY | MATCHES_REGEX;
+
+filterArgument : expression | filterFunction;
+
+optionalFilterArgument : filterArgument?;
+
+filterArgumentList : optionalFilterArgument (COMMA filterArgument)* ->
+ ^(ARGUMENTS optionalFilterArgument filterArgument*);
+
+simpleFilterFunction : filterFunctionNames LPAREN filterArgumentList RPAREN ->
+ ^(FUNCTION filterFunctionNames filterArgumentList);
+
+simpleFilterFunctionOrOperation : simpleFilterFunction | operation;
+
+notFunctionArgList : simpleFilterFunctionOrOperation ->
+ ^(ARGUMENTS simpleFilterFunctionOrOperation);
+
+notFilterFunction : NOT LPAREN notFunctionArgList RPAREN ->
+ ^(FUNCTION NOT notFunctionArgList);
+
+filterFunction : simpleFilterFunction | notFilterFunction;
+
//
@@ -191,5 +233,7 @@ relativePath : currentOrParent relativePathSegment? ->
path : absolutePath | relativePath;
-pathExpression : path EOF ->
- ^(PATH_EXPRESSION path);
+pathOrFunction : path | function;
+
+pathExpression : pathOrFunction EOF ->
+ ^(PATH_EXPRESSION pathOrFunction);
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
index 482390d..fcf651c 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
@@ -54,10 +54,11 @@ public interface RecordPath {
* against a Record via {@link #evaluate(Record)} and then have a Relative RecordPath evaluated against
* the results. This method will throw an Exception if this RecordPath is an Absolute RecordPath.
*
+ * @param record the Record to evaluate
* @param contextNode the context node that represents where in the Record the 'current node' or 'context node' is
* @return a RecordPathResult that contains a FieldValue for each field that matches
*/
- RecordPathResult evaluate(FieldValue contextNode);
+ RecordPathResult evaluate(Record record, FieldValue contextNode);
/**
* Indicates whether the RecordPath is an Absolute Path (starts with a '/' character) or a Relative Path (starts with a '.' character).
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
index 6d85436..6e7ea96 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
@@ -34,7 +34,7 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter {
}
@Override
- public Stream<FieldValue> filter(final FieldValue currentNode, final RecordPathEvaluationContext context) {
+ public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
final Stream<FieldValue> rhsStream = rhs.evaluate(context);
final Optional<FieldValue> firstMatch = rhsStream
.filter(fieldVal -> fieldVal.getValue() != null)
@@ -48,7 +48,10 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter {
final Object value = fieldValue.getValue();
final Stream<FieldValue> lhsStream = lhs.evaluate(context);
- return lhsStream.filter(fieldVal -> test(fieldVal, value));
+ return lhsStream.filter(fieldVal -> {
+ final boolean result = test(fieldVal, value);
+ return invert ? !result : result;
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java
new file mode 100644
index 0000000..385ed7a
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class Contains extends StringComparisonFilter {
+
+ public Contains(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+ super(recordPath, searchValuePath);
+ }
+
+ @Override
+ protected boolean isMatch(final String fieldValue, final String comparison) {
+ return fieldValue.contains(comparison);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java
new file mode 100644
index 0000000..02c3ca9
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.LiteralValuePath;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class ContainsRegex extends FunctionFilter {
+
+ private final RecordPathSegment regexPath;
+
+ private final Pattern compiledPattern;
+
+ public ContainsRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) {
+ super(recordPath);
+ this.regexPath = regexPath;
+
+ if (regexPath instanceof LiteralValuePath) {
+ final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get();
+ final Object value = fieldValue.getValue();
+ final String regex = DataTypeUtils.toString(value, (String) null);
+ compiledPattern = Pattern.compile(regex);
+ } else {
+ compiledPattern = null;
+ }
+ }
+
+ @Override
+ protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+ final Pattern pattern;
+ if (compiledPattern == null) {
+ final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst();
+ if (!fieldValueOption.isPresent()) {
+ return false;
+ }
+
+ final Object value = fieldValueOption.get().getValue();
+ if (value == null) {
+ return false;
+ }
+
+ final String regex = DataTypeUtils.toString(value, (String) null);
+ pattern = Pattern.compile(regex);
+ } else {
+ pattern = compiledPattern;
+ }
+
+ final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+ if (searchString == null) {
+ return false;
+ }
+
+ return pattern.matcher(searchString).find();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java
new file mode 100644
index 0000000..fdc9c86
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class EndsWith extends StringComparisonFilter {
+
+ public EndsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+ super(recordPath, searchValuePath);
+ }
+
+ @Override
+ protected boolean isMatch(final String fieldValue, final String comparison) {
+ return fieldValue.endsWith(comparison);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
index e03b6df..e51d276 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
@@ -37,10 +37,10 @@ public class EqualsFilter extends BinaryOperatorFilter {
if (rhsValue instanceof Number) {
return compareNumbers((Number) lhsValue, (Number) rhsValue);
} else {
- return false;
+ return lhsValue.toString().equals(rhsValue.toString());
}
} else if (rhsValue instanceof Number) {
- return false;
+ return lhsValue.toString().equals(rhsValue.toString());
}
return lhsValue.equals(rhsValue);
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
new file mode 100644
index 0000000..c7e6114
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public abstract class FunctionFilter implements RecordPathFilter {
+ private final RecordPathSegment recordPath;
+
+ protected FunctionFilter(final RecordPathSegment recordPath) {
+ this.recordPath = recordPath;
+ }
+
+ @Override
+ public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
+ return recordPath.evaluate(context)
+ .filter(fv -> invert ? !test(fv, context) : test(fv, context));
+ }
+
+ protected abstract boolean test(FieldValue fieldValue, final RecordPathEvaluationContext context);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java
new file mode 100644
index 0000000..93011f9
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class IsBlank extends FunctionFilter {
+
+
+ public IsBlank(RecordPathSegment recordPath) {
+ super(recordPath);
+ }
+
+ @Override
+ protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+ final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+ if (fieldVal == null) {
+ return true;
+ }
+
+ return fieldVal.trim().isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java
new file mode 100644
index 0000000..823d2b3
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class IsEmpty extends FunctionFilter {
+
+ public IsEmpty(RecordPathSegment recordPath) {
+ super(recordPath);
+ }
+
+ @Override
+ protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+ final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+ if (fieldVal == null) {
+ return true;
+ }
+
+ return fieldVal.isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java
new file mode 100644
index 0000000..a50f895
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.LiteralValuePath;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class MatchesRegex extends FunctionFilter {
+
+ private final RecordPathSegment regexPath;
+
+ private final Pattern compiledPattern;
+
+ public MatchesRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) {
+ super(recordPath);
+ this.regexPath = regexPath;
+
+ if (regexPath instanceof LiteralValuePath) {
+ final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get();
+ final Object value = fieldValue.getValue();
+ final String regex = DataTypeUtils.toString(value, (String) null);
+ compiledPattern = Pattern.compile(regex);
+ } else {
+ compiledPattern = null;
+ }
+ }
+
+ @Override
+ protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+ final Pattern pattern;
+ if (compiledPattern == null) {
+ final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst();
+ if (!fieldValueOption.isPresent()) {
+ return false;
+ }
+
+ final Object value = fieldValueOption.get().getValue();
+ if (value == null) {
+ return false;
+ }
+
+ final String regex = DataTypeUtils.toString(value, (String) null);
+ pattern = Pattern.compile(regex);
+ } else {
+ pattern = compiledPattern;
+ }
+
+ final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+ if (searchString == null) {
+ return false;
+ }
+
+ return pattern.matcher(searchString).matches();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
index 159da4a..d632519 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
@@ -30,7 +30,7 @@ public class NotEqualsFilter extends BinaryOperatorFilter {
protected boolean test(final FieldValue fieldValue, final Object rhsValue) {
final Object lhsValue = fieldValue.getValue();
if (lhsValue == null) {
- return rhsValue != null;
+ return false;
}
if (lhsValue instanceof Number) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
new file mode 100644
index 0000000..bbe38ed
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+
+public class NotFilter implements RecordPathFilter {
+ private final RecordPathFilter filter;
+
+ public NotFilter(final RecordPathFilter filter) {
+ this.filter = filter;
+ }
+
+ @Override
+ public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
+ return filter.filter(context, !invert);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
index 501fca0..389f6d3 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
@@ -24,6 +24,6 @@ import org.apache.nifi.record.path.RecordPathEvaluationContext;
public interface RecordPathFilter {
- Stream<FieldValue> filter(FieldValue currentNode, RecordPathEvaluationContext context);
+ Stream<FieldValue> filter(RecordPathEvaluationContext context, boolean invert);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java
new file mode 100644
index 0000000..4ddd4e8
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class StartsWith extends StringComparisonFilter {
+
+ public StartsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+ super(recordPath, searchValuePath);
+ }
+
+ @Override
+ protected boolean isMatch(final String fieldValue, final String comparison) {
+ return fieldValue.startsWith(comparison);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java
new file mode 100644
index 0000000..e7c35a0
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.Optional;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public abstract class StringComparisonFilter extends FunctionFilter {
+
+ private final RecordPathSegment searchValuePath;
+
+ public StringComparisonFilter(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+ super(recordPath);
+ this.searchValuePath = searchValuePath;
+ }
+
+ @Override
+ protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+ final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+ if (fieldVal == null) {
+ return false;
+ }
+
+ final Optional<FieldValue> firstValue = searchValuePath.evaluate(context).findFirst();
+ if (!firstValue.isPresent()) {
+ return false;
+ }
+
+ final String searchValue = DataTypeUtils.toString(firstValue.get().getValue(), (String) null);
+ if (searchValue == null) {
+ return false;
+ }
+
+ return isMatch(fieldVal, searchValue);
+ }
+
+ protected abstract boolean isMatch(String fieldValue, String comparison);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java
new file mode 100644
index 0000000..daeee05
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class Concat extends RecordPathSegment {
+ private final RecordPathSegment[] valuePaths;
+
+ public Concat(final RecordPathSegment[] valuePaths, final boolean absolute) {
+ super("concat", null, absolute);
+ this.valuePaths = valuePaths;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ Stream<FieldValue> concatenated = Stream.empty();
+
+ for (final RecordPathSegment valuePath : valuePaths) {
+ final Stream<FieldValue> stream = valuePath.evaluate(context);
+ concatenated = Stream.concat(concatenated, stream);
+ }
+
+ final StringBuilder sb = new StringBuilder();
+ concatenated.forEach(fv -> sb.append(DataTypeUtils.toString(fv.getValue(), (String) null)));
+
+ final RecordField field = new RecordField("concat", RecordFieldType.STRING.getDataType());
+ final FieldValue responseValue = new StandardFieldValue(sb.toString(), field, null);
+ return Stream.of(responseValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java
new file mode 100644
index 0000000..e05dca7
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class Replace extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment searchValuePath;
+ private final RecordPathSegment replacementValuePath;
+
+ public Replace(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) {
+ super("replace", null, absolute);
+
+ this.recordPath = recordPath;
+ this.searchValuePath = searchValue;
+ this.replacementValuePath = replacementValue;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+ if (searchValue == null) {
+ return fv;
+ }
+
+ final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context);
+ if (replacementValue == null) {
+ return fv;
+ }
+
+ final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+ final String replaced = value.replace(searchValue, replacementValue);
+ return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java
new file mode 100644
index 0000000..09df8a7
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class ReplaceNull extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment replacementValuePath;
+
+ public ReplaceNull(final RecordPathSegment recordPath, final RecordPathSegment replacementValue, final boolean absolute) {
+ super("replaceNull", null, absolute);
+
+ this.recordPath = recordPath;
+ this.replacementValuePath = replacementValue;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues
+ .map(fv -> {
+ if (fv.getValue() != null) {
+ return fv;
+ }
+
+ final Optional<FieldValue> replacementOption = replacementValuePath.evaluate(context).findFirst();
+ if (!replacementOption.isPresent()) {
+ return fv;
+ }
+
+ final FieldValue replacementFieldValue = replacementOption.get();
+ final Object replacementValue = replacementFieldValue.getValue();
+ if (replacementValue == null) {
+ return fv;
+ }
+
+ return new StandardFieldValue(replacementValue, fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java
new file mode 100644
index 0000000..06f9f53
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.LiteralValuePath;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class ReplaceRegex extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment searchValuePath;
+ private final RecordPathSegment replacementValuePath;
+
+ private final Pattern compiledPattern;
+
+ public ReplaceRegex(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) {
+ super("replaceRegex", null, absolute);
+
+ this.recordPath = recordPath;
+ this.searchValuePath = searchValue;
+ if (searchValue instanceof LiteralValuePath) {
+ final FieldValue fieldValue = ((LiteralValuePath) searchValue).evaluate((RecordPathEvaluationContext) null).findFirst().get();
+ final Object value = fieldValue.getValue();
+ final String regex = DataTypeUtils.toString(value, (String) null);
+ compiledPattern = Pattern.compile(regex);
+ } else {
+ compiledPattern = null;
+ }
+
+ this.replacementValuePath = replacementValue;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+
+ // Determine the Replacement Value
+ final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context);
+ if (replacementValue == null) {
+ return fv;
+ }
+
+ final Pattern pattern;
+ if (compiledPattern == null) {
+ final Optional<FieldValue> fieldValueOption = searchValuePath.evaluate(context).findFirst();
+ if (!fieldValueOption.isPresent()) {
+ return fv;
+ }
+
+ final Object fieldValue = fieldValueOption.get().getValue();
+ if (value == null) {
+ return fv;
+ }
+
+ final String regex = DataTypeUtils.toString(fieldValue, (String) null);
+ pattern = Pattern.compile(regex);
+ } else {
+ pattern = compiledPattern;
+ }
+
+ final String replaced = pattern.matcher(value).replaceAll(replacementValue);
+ return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java
new file mode 100644
index 0000000..89d2f32
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class Substring extends RecordPathSegment {
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment startIndexPath;
+ private final RecordPathSegment endIndexPath;
+
+ public Substring(final RecordPathSegment recordPath, final RecordPathSegment startIndex, final RecordPathSegment endIndex, final boolean absolute) {
+ super("substring", null, absolute);
+
+ this.recordPath = recordPath;
+ this.startIndexPath = startIndex;
+ this.endIndexPath = endIndex;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final OptionalInt startIndex = getIndex(startIndexPath, context);
+ if (!startIndex.isPresent()) {
+ return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+ }
+
+ final OptionalInt endIndex = getIndex(endIndexPath, context);
+ if (!endIndex.isPresent()) {
+ return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+ }
+
+ final int start = startIndex.getAsInt();
+ final int end = endIndex.getAsInt();
+
+ final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+
+ // Allow for negative indices to be used to reference offset from string length. We add 1 here because we want -1 to refer
+ // to the actual length of the string.
+ final int evaluatedEndIndex = end < 0 ? value.length() + 1 + end : end;
+ final int evaluatedStartIndex = start < 0 ? value.length() + 1 + start : start;
+
+ if (evaluatedEndIndex <= evaluatedStartIndex || evaluatedStartIndex < 0 || evaluatedStartIndex > value.length()) {
+ return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+ }
+
+ final String substring = value.substring(evaluatedStartIndex, Math.min(evaluatedEndIndex, value.length()));
+ return new StandardFieldValue(substring, fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+ private OptionalInt getIndex(final RecordPathSegment indexSegment, final RecordPathEvaluationContext context) {
+ final Optional<FieldValue> firstFieldValueOption = indexSegment.evaluate(context).findFirst();
+ if (!firstFieldValueOption.isPresent()) {
+ return OptionalInt.empty();
+ }
+
+ final FieldValue fieldValue = firstFieldValueOption.get();
+ final Object indexObject = fieldValue.getValue();
+ if (!DataTypeUtils.isIntegerTypeCompatible(indexObject)) {
+ return OptionalInt.empty();
+ }
+
+ final String fieldName;
+ final RecordField field = fieldValue.getField();
+ fieldName = field == null ? "<Unknown Field>" : field.getFieldName();
+
+ return OptionalInt.of(DataTypeUtils.toInteger(indexObject, fieldName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java
new file mode 100644
index 0000000..4bdd6b3
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringAfter extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment searchValuePath;
+
+ public SubstringAfter(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+ super("substringAfter", null, absolute);
+
+ this.recordPath = recordPath;
+ this.searchValuePath = searchValue;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+ if (searchValue == null || searchValue.isEmpty()) {
+ return fv;
+ }
+
+ final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+ final int index = value.indexOf(searchValue);
+ if (index < 0) {
+ return fv;
+ }
+
+ if (value.length() < index + 1) {
+ return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+ }
+
+ return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java
new file mode 100644
index 0000000..71af1b9
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringAfterLast extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment searchValuePath;
+
+ public SubstringAfterLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+ super("substringAfterLast", null, absolute);
+
+ this.recordPath = recordPath;
+ this.searchValuePath = searchValue;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+ if (searchValue == null || searchValue.isEmpty()) {
+ return fv;
+ }
+
+ final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+ final int index = value.lastIndexOf(searchValue);
+ if (index < 0) {
+ return fv;
+ }
+
+ if (value.length() < index + 1) {
+ return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+ }
+
+ return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java
new file mode 100644
index 0000000..ddadc8e
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringBefore extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment searchValuePath;
+
+ public SubstringBefore(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+ super("substringBefore", null, absolute);
+
+ this.recordPath = recordPath;
+ this.searchValuePath = searchValue;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+ if (searchValue == null || searchValue.isEmpty()) {
+ return fv;
+ }
+
+ final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+ final int index = value.indexOf(searchValue);
+ if (index < 0) {
+ return fv;
+ }
+
+ return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java
new file mode 100644
index 0000000..ed3aa5e
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringBeforeLast extends RecordPathSegment {
+
+ private final RecordPathSegment recordPath;
+ private final RecordPathSegment searchValuePath;
+
+ public SubstringBeforeLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+ super("substringBeforeLast", null, absolute);
+
+ this.recordPath = recordPath;
+ this.searchValuePath = searchValue;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+ return fieldValues.filter(fv -> fv.getValue() != null)
+ .map(fv -> {
+ final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+ if (searchValue == null || searchValue.isEmpty()) {
+ return fv;
+ }
+
+ final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+ final int index = value.lastIndexOf(searchValue);
+ if (index < 0) {
+ return fv;
+ }
+
+ return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null));
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
index 759a848..7e87344 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
@@ -43,17 +43,12 @@ public class PredicatePath extends RecordPathSegment {
context.setContextNode(fieldVal);
try {
// Really what we want to do is filter out Stream<FieldValue> but that becomes very difficult
- // to implement for the RecordPathFilter's. So, instead, we pass the FieldValue to field and
+ // to implement for the RecordPathFilter's. So, instead, we pass
// the RecordPathEvaluationContext and receive back a Stream<FieldValue>. Since this is a Predicate,
// though, we don't want to transform our Stream - we just want to filter it. So we handle this by
// mapping the result back to fieldVal. And since this predicate shouldn't return the same field multiple
- // times, we will limit the stream to 1 element. We also filter out any FieldValue whose value is null.
- // This is done because if we have a predicate like [./iDoNotExist != 'hello'] then the relative path will
- // return a value of null and that will be compared to 'hello'. Since they are not equal, the NotEqualsFilter
- // will return 'true', so we will get back a FieldValue with a null value. This should not make the Predicate
- // true.
- return filter.filter(fieldVal, context)
- .filter(fv -> fv.getValue() != null)
+ // times, we will limit the stream to 1 element.
+ return filter.filter(context, false)
.limit(1)
.map(ignore -> fieldVal);
} finally {
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index 24b872a..ef372ac 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -23,6 +23,7 @@ import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD;
import static org.apache.nifi.record.path.RecordPathParser.DESCENDANT_REFERENCE;
import static org.apache.nifi.record.path.RecordPathParser.EQUAL;
import static org.apache.nifi.record.path.RecordPathParser.FIELD_NAME;
+import static org.apache.nifi.record.path.RecordPathParser.FUNCTION;
import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN;
import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN_EQUAL;
import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN;
@@ -48,17 +49,38 @@ import java.util.function.BiFunction;
import org.antlr.runtime.tree.Tree;
import org.apache.nifi.record.path.NumericRange;
import org.apache.nifi.record.path.exception.RecordPathException;
+import org.apache.nifi.record.path.filter.Contains;
+import org.apache.nifi.record.path.filter.ContainsRegex;
+import org.apache.nifi.record.path.filter.EndsWith;
import org.apache.nifi.record.path.filter.EqualsFilter;
import org.apache.nifi.record.path.filter.GreaterThanFilter;
import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter;
+import org.apache.nifi.record.path.filter.IsBlank;
+import org.apache.nifi.record.path.filter.IsEmpty;
import org.apache.nifi.record.path.filter.LessThanFilter;
import org.apache.nifi.record.path.filter.LessThanOrEqualFilter;
+import org.apache.nifi.record.path.filter.MatchesRegex;
import org.apache.nifi.record.path.filter.NotEqualsFilter;
+import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter;
+import org.apache.nifi.record.path.filter.StartsWith;
+import org.apache.nifi.record.path.functions.Concat;
+import org.apache.nifi.record.path.functions.Replace;
+import org.apache.nifi.record.path.functions.ReplaceNull;
+import org.apache.nifi.record.path.functions.ReplaceRegex;
+import org.apache.nifi.record.path.functions.Substring;
+import org.apache.nifi.record.path.functions.SubstringAfter;
+import org.apache.nifi.record.path.functions.SubstringAfterLast;
+import org.apache.nifi.record.path.functions.SubstringBefore;
+import org.apache.nifi.record.path.functions.SubstringBeforeLast;
public class RecordPathCompiler {
public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) {
+ if (pathTree.getType() == FUNCTION) {
+ return buildPath(pathTree, null, absolute);
+ }
+
RecordPathSegment parent = root;
for (int i = 0; i < pathTree.getChildCount(); i++) {
final Tree child = pathTree.getChild(i);
@@ -168,6 +190,59 @@ public class RecordPathCompiler {
case PATH: {
return compile(tree, new RootPath(), absolute);
}
+ case FUNCTION: {
+ final String functionName = tree.getChild(0).getText();
+ final Tree argumentListTree = tree.getChild(1);
+
+ switch (functionName) {
+ case "substring": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
+ return new Substring(args[0], args[1], args[2], absolute);
+ }
+ case "substringAfter": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new SubstringAfter(args[0], args[1], absolute);
+ }
+ case "substringAfterLast": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new SubstringAfterLast(args[0], args[1], absolute);
+ }
+ case "substringBefore": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new SubstringBefore(args[0], args[1], absolute);
+ }
+ case "substringBeforeLast": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new SubstringBeforeLast(args[0], args[1], absolute);
+ }
+ case "replace": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
+ return new Replace(args[0], args[1], args[2], absolute);
+ }
+ case "replaceRegex": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
+ return new ReplaceRegex(args[0], args[1], args[2], absolute);
+ }
+ case "replaceNull": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new ReplaceNull(args[0], args[1], absolute);
+ }
+ case "concat": {
+ final int numArgs = argumentListTree.getChildCount();
+
+ final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
+ for (int i = 0; i < numArgs; i++) {
+ argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
+ }
+
+ return new Concat(argPaths, absolute);
+ }
+ default: {
+ throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only "
+ + "be used within a predicate, not as a standalone function");
+ }
+ }
+ }
}
throw new RecordPathException("Encountered unexpected token " + tree);
@@ -187,6 +262,8 @@ public class RecordPathCompiler {
return createBinaryOperationFilter(operatorTree, parent, GreaterThanFilter::new, absolute);
case GREATER_THAN_EQUAL:
return createBinaryOperationFilter(operatorTree, parent, GreaterThanOrEqualFilter::new, absolute);
+ case FUNCTION:
+ return createFunctionFilter(operatorTree, absolute);
default:
throw new RecordPathException("Expected an Expression of form <value> <operator> <value> to follow '[' Token but found " + operatorTree);
}
@@ -200,4 +277,66 @@ public class RecordPathCompiler {
final RecordPathSegment rhsPath = buildPath(rhsTree, parent, absolute);
return function.apply(lhsPath, rhsPath);
}
+
+ private static RecordPathFilter createFunctionFilter(final Tree functionTree, final boolean absolute) {
+ final String functionName = functionTree.getChild(0).getText();
+ final Tree argumentListTree = functionTree.getChild(1);
+
+ switch (functionName) {
+ case "contains": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new Contains(args[0], args[1]);
+ }
+ case "matchesRegex": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new MatchesRegex(args[0], args[1]);
+ }
+ case "containsRegex": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new ContainsRegex(args[0], args[1]);
+ }
+ case "startsWith": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new StartsWith(args[0], args[1]);
+ }
+ case "endsWith": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+ return new EndsWith(args[0], args[1]);
+ }
+ case "isEmpty": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
+ return new IsEmpty(args[0]);
+ }
+ case "isBlank": {
+ final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
+ return new IsBlank(args[0]);
+ }
+ case "not": {
+ final int numArgs = argumentListTree.getChildCount();
+ if (numArgs != 1) {
+ throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes 1 argument but got " + numArgs);
+ }
+
+ final Tree childTree = argumentListTree.getChild(0);
+ final RecordPathFilter childFilter = createFilter(childTree, null, absolute);
+ return new NotFilter(childFilter);
+ }
+ }
+
+ throw new RecordPathException("Invalid function name: " + functionName);
+ }
+
+ private static RecordPathSegment[] getArgPaths(final Tree argumentListTree, final int expectedCount, final String functionName, final boolean absolute) {
+ final int numArgs = argumentListTree.getChildCount();
+ if (numArgs != expectedCount) {
+ throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes " + expectedCount + " arguments but got " + numArgs);
+ }
+
+ final RecordPathSegment[] argPaths = new RecordPathSegment[expectedCount];
+ for (int i = 0; i < expectedCount; i++) {
+ argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
+ }
+
+ return argPaths;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
index 92ff010..845ca84 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
@@ -25,7 +25,6 @@ import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
-import org.apache.nifi.record.path.util.Filters;
import org.apache.nifi.serialization.record.Record;
public abstract class RecordPathSegment implements RecordPath {
@@ -33,7 +32,7 @@ public abstract class RecordPathSegment implements RecordPath {
private final RecordPathSegment parentPath;
private final boolean absolute;
- RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) {
+ public RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) {
this.path = path;
this.parentPath = parentPath;
this.absolute = absolute;
@@ -98,34 +97,8 @@ public abstract class RecordPathSegment implements RecordPath {
}
@Override
- public final RecordPathResult evaluate(final FieldValue contextNode) {
- final RecordPathEvaluationContext context;
- if (Filters.isRecord(contextNode.getField().getDataType(), contextNode.getValue())) {
- final Record record = (Record) contextNode.getValue();
- if (record == null) {
- return new RecordPathResult() {
- @Override
- public String getPath() {
- return RecordPathSegment.this.getPath();
- }
-
- @Override
- public Stream<FieldValue> getSelectedFields() {
- return Stream.empty();
- }
- };
- }
-
- context = new StandardRecordPathEvaluationContext(record);
- } else {
- final FieldValue parent = contextNode.getParent().orElse(null);
- if (parent == null) {
- context = new StandardRecordPathEvaluationContext(null);
- } else {
- context = new StandardRecordPathEvaluationContext(parent.getParentRecord().orElse(null));
- }
- }
-
+ public final RecordPathResult evaluate(final Record record, final FieldValue contextNode) {
+ final RecordPathEvaluationContext context = new StandardRecordPathEvaluationContext(record);
context.setContextNode(contextNode);
final Stream<FieldValue> selectedFields = evaluate(context);
http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java
new file mode 100644
index 0000000..e8056e4
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.util;
+
+import java.util.Optional;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class RecordPathUtils {
+
+ public static String getFirstStringValue(final RecordPathSegment segment, final RecordPathEvaluationContext context) {
+ final Optional<FieldValue> stringFieldValue = segment.evaluate(context).findFirst();
+ if (!stringFieldValue.isPresent()) {
+ return null;
+ }
+
+ final String stringValue = DataTypeUtils.toString(stringFieldValue.get().getValue(), (String) null);
+ if (stringValue == null) {
+ return null;
+ }
+
+ return stringValue;
+ }
+}