You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/06/02 18:07:33 UTC

[2/2] nifi git commit: NIFI-4009: Added support for several key functions in RecordPath

NIFI-4009: Added support for several key functions in RecordPath

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1881


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/32314d70
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/32314d70
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/32314d70

Branch: refs/heads/master
Commit: 32314d70fdea6e87fa38729956fa24b3298f5727
Parents: 0bddcfe
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 31 12:17:28 2017 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Fri Jun 2 14:06:05 2017 -0400

----------------------------------------------------------------------
 .../apache/nifi/record/path/RecordPathLexer.g   |  19 +-
 .../apache/nifi/record/path/RecordPathParser.g  |  58 +++-
 .../org/apache/nifi/record/path/RecordPath.java |   3 +-
 .../path/filter/BinaryOperatorFilter.java       |   7 +-
 .../nifi/record/path/filter/Contains.java       |  32 ++
 .../nifi/record/path/filter/ContainsRegex.java  |  77 +++++
 .../nifi/record/path/filter/EndsWith.java       |  33 ++
 .../nifi/record/path/filter/EqualsFilter.java   |   4 +-
 .../nifi/record/path/filter/FunctionFilter.java |  40 +++
 .../apache/nifi/record/path/filter/IsBlank.java |  41 +++
 .../apache/nifi/record/path/filter/IsEmpty.java |  40 +++
 .../nifi/record/path/filter/MatchesRegex.java   |  77 +++++
 .../record/path/filter/NotEqualsFilter.java     |   2 +-
 .../nifi/record/path/filter/NotFilter.java      |  38 +++
 .../record/path/filter/RecordPathFilter.java    |   2 +-
 .../nifi/record/path/filter/StartsWith.java     |  33 ++
 .../path/filter/StringComparisonFilter.java     |  57 ++++
 .../nifi/record/path/functions/Concat.java      |  55 ++++
 .../nifi/record/path/functions/Replace.java     |  64 ++++
 .../nifi/record/path/functions/ReplaceNull.java |  64 ++++
 .../record/path/functions/ReplaceRegex.java     |  93 ++++++
 .../nifi/record/path/functions/Substring.java   |  96 ++++++
 .../record/path/functions/SubstringAfter.java   |  65 ++++
 .../path/functions/SubstringAfterLast.java      |  65 ++++
 .../record/path/functions/SubstringBefore.java  |  61 ++++
 .../path/functions/SubstringBeforeLast.java     |  61 ++++
 .../nifi/record/path/paths/PredicatePath.java   |  11 +-
 .../record/path/paths/RecordPathCompiler.java   | 139 +++++++++
 .../record/path/paths/RecordPathSegment.java    |  33 +-
 .../nifi/record/path/util/RecordPathUtils.java  |  42 +++
 .../apache/nifi/record/path/TestRecordPath.java | 304 ++++++++++++++++++-
 .../src/main/asciidoc/record-path-guide.adoc    | 252 +++++++++++++++
 .../nifi/processors/standard/UpdateRecord.java  |   2 +-
 33 files changed, 1912 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
index 6240e93..cd466f7 100644
--- a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
+++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathLexer.g
@@ -69,6 +69,8 @@ CHILD_SEPARATOR : '/';
 DESCENDANT_SEPARATOR : '//';
 LBRACKET : '[';
 RBRACKET : ']';
+LPAREN : '(';
+RPAREN : ')';
 NUMBER : '-'? ('0'..'9')+;
 QUOTE : '\'';
 COMMA : ',';
@@ -92,9 +94,20 @@ WHITESPACE : SPACE+ { skip(); };
 fragment SPACE : ' ' | '\t' | '\n' | '\r' | '\u000C';
 
 
-RAW_FIELD_NAME : (
-	~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>')
-	~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ')*
+// filter functions
+CONTAINS : 'contains';
+CONTAINS_REGEX : 'containsRegex';
+ENDS_WITH : 'endsWith';
+STARTS_WITH : 'startsWith';
+IS_BLANK : 'isBlank';
+IS_EMPTY : 'isEmpty';
+MATCHES_REGEX : 'matchesRegex';
+NOT : 'not';
+
+
+IDENTIFIER : (
+	~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>' | '(' | ')' )
+	~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ' | '(' | ')' )*
 );
 
 // STRINGS

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
index e9bad38..5e406cb 100644
--- a/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
+++ b/nifi-commons/nifi-record-path/src/main/antlr3/org/apache/nifi/record/path/RecordPathParser.g
@@ -38,6 +38,8 @@ tokens {
 	PREDICATE;
 	OPERATOR;
 	RELATIVE_PATH;
+	FUNCTION;
+	ARGUMENTS;
 }
 
 @header {
@@ -90,7 +92,7 @@ multipleStringLiterals : STRING_LITERAL (COMMA! STRING_LITERAL)*;
 stringList : multipleStringLiterals ->
 	^(STRING_LIST multipleStringLiterals);
 
-rawOrLiteral : RAW_FIELD_NAME | STRING_LITERAL;
+rawOrLiteral : IDENTIFIER | STRING_LITERAL;
 
 
 
@@ -118,6 +120,7 @@ index : LBRACKET! indexOrKey RBRACKET!;
 
 
 
+
 //
 // Predicates
 //
@@ -125,14 +128,53 @@ operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN | GREATER_THAN_EQUAL | EQU
 
 literal : NUMBER | STRING_LITERAL;
 
-expression : path | literal;
+expression : path | literal | function;
+
+operation : expression operator^ expression;
+
+filter : filterFunction | operation;
+
+predicate : LBRACKET filter RBRACKET ->
+	^(PREDICATE filter);
+
+
+//
+// Functions
+//
+
+argument : expression;
+
+optionalArgument : argument?;
 
-operation : relativePath operator^ expression;
+argumentList : optionalArgument (COMMA argument)* ->
+	^(ARGUMENTS optionalArgument argument*);
 
-predicate : LBRACKET operation RBRACKET ->
-	^(PREDICATE operation);
+function : IDENTIFIER LPAREN argumentList RPAREN ->
+	^(FUNCTION IDENTIFIER argumentList);
 
 
+filterFunctionNames : CONTAINS | CONTAINS_REGEX | ENDS_WITH | STARTS_WITH | IS_BLANK | IS_EMPTY | MATCHES_REGEX;
+
+filterArgument : expression | filterFunction;
+
+optionalFilterArgument : filterArgument?;
+
+filterArgumentList : optionalFilterArgument (COMMA filterArgument)* ->
+	^(ARGUMENTS optionalFilterArgument filterArgument*);
+
+simpleFilterFunction : filterFunctionNames LPAREN filterArgumentList RPAREN ->
+	^(FUNCTION filterFunctionNames filterArgumentList);
+
+simpleFilterFunctionOrOperation : simpleFilterFunction | operation;
+
+notFunctionArgList : simpleFilterFunctionOrOperation ->
+	^(ARGUMENTS simpleFilterFunctionOrOperation);
+
+notFilterFunction : NOT LPAREN notFunctionArgList RPAREN ->
+	^(FUNCTION NOT notFunctionArgList);
+	
+filterFunction : simpleFilterFunction | notFilterFunction; 
+
 
 
 //
@@ -191,5 +233,7 @@ relativePath : currentOrParent relativePathSegment? ->
 
 path : absolutePath | relativePath;
 
-pathExpression : path EOF ->
-	^(PATH_EXPRESSION path);
+pathOrFunction : path | function;
+
+pathExpression : pathOrFunction EOF ->
+	^(PATH_EXPRESSION pathOrFunction);

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
index 482390d..fcf651c 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/RecordPath.java
@@ -54,10 +54,11 @@ public interface RecordPath {
      * against a Record via {@link #evaluate(Record)} and then have a Relative RecordPath evaluated against
      * the results. This method will throw an Exception if this RecordPath is an Absolute RecordPath.
      *
+     * @param record the Record to evaluate
      * @param contextNode the context node that represents where in the Record the 'current node' or 'context node' is
      * @return a RecordPathResult that contains a FieldValue for each field that matches
      */
-    RecordPathResult evaluate(FieldValue contextNode);
+    RecordPathResult evaluate(Record record, FieldValue contextNode);
 
     /**
      * Indicates whether the RecordPath is an Absolute Path (starts with a '/' character) or a Relative Path (starts with a '.' character).

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
index 6d85436..6e7ea96 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/BinaryOperatorFilter.java
@@ -34,7 +34,7 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter {
     }
 
     @Override
-    public Stream<FieldValue> filter(final FieldValue currentNode, final RecordPathEvaluationContext context) {
+    public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
         final Stream<FieldValue> rhsStream = rhs.evaluate(context);
         final Optional<FieldValue> firstMatch = rhsStream
             .filter(fieldVal -> fieldVal.getValue() != null)
@@ -48,7 +48,10 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter {
         final Object value = fieldValue.getValue();
 
         final Stream<FieldValue> lhsStream = lhs.evaluate(context);
-        return lhsStream.filter(fieldVal -> test(fieldVal, value));
+        return lhsStream.filter(fieldVal -> {
+            final boolean result = test(fieldVal, value);
+            return invert ? !result : result;
+        });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java
new file mode 100644
index 0000000..385ed7a
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/Contains.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class Contains extends StringComparisonFilter {
+
+    public Contains(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+        super(recordPath, searchValuePath);
+    }
+
+    @Override
+    protected boolean isMatch(final String fieldValue, final String comparison) {
+        return fieldValue.contains(comparison);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java
new file mode 100644
index 0000000..02c3ca9
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/ContainsRegex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.LiteralValuePath;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class ContainsRegex extends FunctionFilter {
+
+    private final RecordPathSegment regexPath;
+
+    private final Pattern compiledPattern;
+
+    public ContainsRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) {
+        super(recordPath);
+        this.regexPath = regexPath;
+
+        if (regexPath instanceof LiteralValuePath) {
+            final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get();
+            final Object value = fieldValue.getValue();
+            final String regex = DataTypeUtils.toString(value, (String) null);
+            compiledPattern = Pattern.compile(regex);
+        } else {
+            compiledPattern = null;
+        }
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+        final Pattern pattern;
+        if (compiledPattern == null) {
+            final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst();
+            if (!fieldValueOption.isPresent()) {
+                return false;
+            }
+
+            final Object value = fieldValueOption.get().getValue();
+            if (value == null) {
+                return false;
+            }
+
+            final String regex = DataTypeUtils.toString(value, (String) null);
+            pattern = Pattern.compile(regex);
+        } else {
+            pattern = compiledPattern;
+        }
+
+        final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+        if (searchString == null) {
+            return false;
+        }
+
+        return pattern.matcher(searchString).find();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java
new file mode 100644
index 0000000..fdc9c86
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EndsWith.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class EndsWith extends StringComparisonFilter {
+
+    public EndsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+        super(recordPath, searchValuePath);
+    }
+
+    @Override
+    protected boolean isMatch(final String fieldValue, final String comparison) {
+        return fieldValue.endsWith(comparison);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
index e03b6df..e51d276 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/EqualsFilter.java
@@ -37,10 +37,10 @@ public class EqualsFilter extends BinaryOperatorFilter {
             if (rhsValue instanceof Number) {
                 return compareNumbers((Number) lhsValue, (Number) rhsValue);
             } else {
-                return false;
+                return lhsValue.toString().equals(rhsValue.toString());
             }
         } else if (rhsValue instanceof Number) {
-            return false;
+            return lhsValue.toString().equals(rhsValue.toString());
         }
 
         return lhsValue.equals(rhsValue);

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
new file mode 100644
index 0000000..c7e6114
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/FunctionFilter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public abstract class FunctionFilter implements RecordPathFilter {
+    private final RecordPathSegment recordPath;
+
+    protected FunctionFilter(final RecordPathSegment recordPath) {
+        this.recordPath = recordPath;
+    }
+
+    @Override
+    public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
+        return recordPath.evaluate(context)
+            .filter(fv -> invert ? !test(fv, context) : test(fv, context));
+    }
+
+    protected abstract boolean test(FieldValue fieldValue, final RecordPathEvaluationContext context);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java
new file mode 100644
index 0000000..93011f9
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsBlank.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class IsBlank extends FunctionFilter {
+
+
+    public IsBlank(RecordPathSegment recordPath) {
+        super(recordPath);
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+        final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+        if (fieldVal == null) {
+            return true;
+        }
+
+        return fieldVal.trim().isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java
new file mode 100644
index 0000000..823d2b3
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/IsEmpty.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class IsEmpty extends FunctionFilter {
+
+    public IsEmpty(RecordPathSegment recordPath) {
+        super(recordPath);
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+        final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+        if (fieldVal == null) {
+            return true;
+        }
+
+        return fieldVal.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java
new file mode 100644
index 0000000..a50f895
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/MatchesRegex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.LiteralValuePath;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class MatchesRegex extends FunctionFilter {
+
+    private final RecordPathSegment regexPath;
+
+    private final Pattern compiledPattern;
+
+    public MatchesRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) {
+        super(recordPath);
+        this.regexPath = regexPath;
+
+        if (regexPath instanceof LiteralValuePath) {
+            final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get();
+            final Object value = fieldValue.getValue();
+            final String regex = DataTypeUtils.toString(value, (String) null);
+            compiledPattern = Pattern.compile(regex);
+        } else {
+            compiledPattern = null;
+        }
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+        final Pattern pattern;
+        if (compiledPattern == null) {
+            final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst();
+            if (!fieldValueOption.isPresent()) {
+                return false;
+            }
+
+            final Object value = fieldValueOption.get().getValue();
+            if (value == null) {
+                return false;
+            }
+
+            final String regex = DataTypeUtils.toString(value, (String) null);
+            pattern = Pattern.compile(regex);
+        } else {
+            pattern = compiledPattern;
+        }
+
+        final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+        if (searchString == null) {
+            return false;
+        }
+
+        return pattern.matcher(searchString).matches();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
index 159da4a..d632519 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotEqualsFilter.java
@@ -30,7 +30,7 @@ public class NotEqualsFilter extends BinaryOperatorFilter {
     protected boolean test(final FieldValue fieldValue, final Object rhsValue) {
         final Object lhsValue = fieldValue.getValue();
         if (lhsValue == null) {
-            return rhsValue != null;
+            return false;
         }
 
         if (lhsValue instanceof Number) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
new file mode 100644
index 0000000..bbe38ed
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/NotFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+
+public class NotFilter implements RecordPathFilter {
+    private final RecordPathFilter filter;
+
+    public NotFilter(final RecordPathFilter filter) {
+        this.filter = filter;
+    }
+
+    @Override
+    public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
+        return filter.filter(context, !invert);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
index 501fca0..389f6d3 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/RecordPathFilter.java
@@ -24,6 +24,6 @@ import org.apache.nifi.record.path.RecordPathEvaluationContext;
 
 public interface RecordPathFilter {
 
-    Stream<FieldValue> filter(FieldValue currentNode, RecordPathEvaluationContext context);
+    Stream<FieldValue> filter(RecordPathEvaluationContext context, boolean invert);
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java
new file mode 100644
index 0000000..4ddd4e8
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StartsWith.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class StartsWith extends StringComparisonFilter {
+
+    public StartsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+        super(recordPath, searchValuePath);
+    }
+
+    @Override
+    protected boolean isMatch(final String fieldValue, final String comparison) {
+        return fieldValue.startsWith(comparison);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java
new file mode 100644
index 0000000..e7c35a0
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/filter/StringComparisonFilter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.filter;
+
+import java.util.Optional;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public abstract class StringComparisonFilter extends FunctionFilter {
+
+    private final RecordPathSegment searchValuePath;
+
+    public StringComparisonFilter(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
+        super(recordPath);
+        this.searchValuePath = searchValuePath;
+    }
+
+    @Override
+    protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
+        final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
+        if (fieldVal == null) {
+            return false;
+        }
+
+        final Optional<FieldValue> firstValue = searchValuePath.evaluate(context).findFirst();
+        if (!firstValue.isPresent()) {
+            return false;
+        }
+
+        final String searchValue = DataTypeUtils.toString(firstValue.get().getValue(), (String) null);
+        if (searchValue == null) {
+            return false;
+        }
+
+        return isMatch(fieldVal, searchValue);
+    }
+
+    protected abstract boolean isMatch(String fieldValue, String comparison);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java
new file mode 100644
index 0000000..daeee05
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Concat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class Concat extends RecordPathSegment {
+    private final RecordPathSegment[] valuePaths;
+
+    public Concat(final RecordPathSegment[] valuePaths, final boolean absolute) {
+        super("concat", null, absolute);
+        this.valuePaths = valuePaths;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        Stream<FieldValue> concatenated = Stream.empty();
+
+        for (final RecordPathSegment valuePath : valuePaths) {
+            final Stream<FieldValue> stream = valuePath.evaluate(context);
+            concatenated = Stream.concat(concatenated, stream);
+        }
+
+        final StringBuilder sb = new StringBuilder();
+        concatenated.forEach(fv -> sb.append(DataTypeUtils.toString(fv.getValue(), (String) null)));
+
+        final RecordField field = new RecordField("concat", RecordFieldType.STRING.getDataType());
+        final FieldValue responseValue = new StandardFieldValue(sb.toString(), field, null);
+        return Stream.of(responseValue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java
new file mode 100644
index 0000000..e05dca7
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Replace.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class Replace extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment searchValuePath;
+    private final RecordPathSegment replacementValuePath;
+
+    public Replace(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) {
+        super("replace", null, absolute);
+
+        this.recordPath = recordPath;
+        this.searchValuePath = searchValue;
+        this.replacementValuePath = replacementValue;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+            .map(fv -> {
+                final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+                if (searchValue == null) {
+                    return fv;
+                }
+
+                final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context);
+                if (replacementValue == null) {
+                    return fv;
+                }
+
+                final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+                final String replaced = value.replace(searchValue, replacementValue);
+                return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java
new file mode 100644
index 0000000..09df8a7
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceNull.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+
+public class ReplaceNull extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment replacementValuePath;
+
+    public ReplaceNull(final RecordPathSegment recordPath, final RecordPathSegment replacementValue, final boolean absolute) {
+        super("replaceNull", null, absolute);
+
+        this.recordPath = recordPath;
+        this.replacementValuePath = replacementValue;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues
+            .map(fv -> {
+                if (fv.getValue() != null) {
+                    return fv;
+                }
+
+                final Optional<FieldValue> replacementOption = replacementValuePath.evaluate(context).findFirst();
+                if (!replacementOption.isPresent()) {
+                    return fv;
+                }
+
+                final FieldValue replacementFieldValue = replacementOption.get();
+                final Object replacementValue = replacementFieldValue.getValue();
+                if (replacementValue == null) {
+                    return fv;
+                }
+
+                return new StandardFieldValue(replacementValue, fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java
new file mode 100644
index 0000000..06f9f53
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/ReplaceRegex.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.LiteralValuePath;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class ReplaceRegex extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment searchValuePath;
+    private final RecordPathSegment replacementValuePath;
+
+    private final Pattern compiledPattern;
+
+    public ReplaceRegex(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) {
+        super("replaceRegex", null, absolute);
+
+        this.recordPath = recordPath;
+        this.searchValuePath = searchValue;
+        if (searchValue instanceof LiteralValuePath) {
+            final FieldValue fieldValue = ((LiteralValuePath) searchValue).evaluate((RecordPathEvaluationContext) null).findFirst().get();
+            final Object value = fieldValue.getValue();
+            final String regex = DataTypeUtils.toString(value, (String) null);
+            compiledPattern = Pattern.compile(regex);
+        } else {
+            compiledPattern = null;
+        }
+
+        this.replacementValuePath = replacementValue;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+            .map(fv -> {
+                final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+
+                // Determine the Replacement Value
+                final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context);
+                if (replacementValue == null) {
+                    return fv;
+                }
+
+                final Pattern pattern;
+                if (compiledPattern == null) {
+                    final Optional<FieldValue> fieldValueOption = searchValuePath.evaluate(context).findFirst();
+                    if (!fieldValueOption.isPresent()) {
+                        return fv;
+                    }
+
+                    final Object fieldValue = fieldValueOption.get().getValue();
+                    if (value == null) {
+                        return fv;
+                    }
+
+                    final String regex = DataTypeUtils.toString(fieldValue, (String) null);
+                    pattern = Pattern.compile(regex);
+                } else {
+                    pattern = compiledPattern;
+                }
+
+                final String replaced = pattern.matcher(value).replaceAll(replacementValue);
+                return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java
new file mode 100644
index 0000000..89d2f32
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/Substring.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class Substring extends RecordPathSegment {
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment startIndexPath;
+    private final RecordPathSegment endIndexPath;
+
+    public Substring(final RecordPathSegment recordPath, final RecordPathSegment startIndex, final RecordPathSegment endIndex, final boolean absolute) {
+        super("substring", null, absolute);
+
+        this.recordPath = recordPath;
+        this.startIndexPath = startIndex;
+        this.endIndexPath = endIndex;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+            .map(fv -> {
+                final OptionalInt startIndex = getIndex(startIndexPath, context);
+                if (!startIndex.isPresent()) {
+                    return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+                }
+
+                final OptionalInt endIndex = getIndex(endIndexPath, context);
+                if (!endIndex.isPresent()) {
+                    return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+                }
+
+                final int start = startIndex.getAsInt();
+                final int end = endIndex.getAsInt();
+
+                final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+
+                // Allow for negative indices to be used to reference offset from string length. We add 1 here because we want -1 to refer
+                // to the actual length of the string.
+                final int evaluatedEndIndex = end < 0 ? value.length() + 1 + end : end;
+                final int evaluatedStartIndex = start < 0 ? value.length() + 1 + start : start;
+
+                if (evaluatedEndIndex <= evaluatedStartIndex || evaluatedStartIndex < 0 || evaluatedStartIndex > value.length()) {
+                    return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+                }
+
+                final String substring = value.substring(evaluatedStartIndex, Math.min(evaluatedEndIndex, value.length()));
+                return new StandardFieldValue(substring, fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+    private OptionalInt getIndex(final RecordPathSegment indexSegment, final RecordPathEvaluationContext context) {
+        final Optional<FieldValue> firstFieldValueOption = indexSegment.evaluate(context).findFirst();
+        if (!firstFieldValueOption.isPresent()) {
+            return OptionalInt.empty();
+        }
+
+        final FieldValue fieldValue = firstFieldValueOption.get();
+        final Object indexObject = fieldValue.getValue();
+        if (!DataTypeUtils.isIntegerTypeCompatible(indexObject)) {
+            return OptionalInt.empty();
+        }
+
+        final String fieldName;
+        final RecordField field = fieldValue.getField();
+        fieldName = field == null ? "<Unknown Field>" : field.getFieldName();
+
+        return OptionalInt.of(DataTypeUtils.toInteger(indexObject, fieldName));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java
new file mode 100644
index 0000000..4bdd6b3
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringAfter extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment searchValuePath;
+
+    public SubstringAfter(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+        super("substringAfter", null, absolute);
+
+        this.recordPath = recordPath;
+        this.searchValuePath = searchValue;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+            .map(fv -> {
+                final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+                if (searchValue == null || searchValue.isEmpty()) {
+                    return fv;
+                }
+
+                final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+                final int index = value.indexOf(searchValue);
+                if (index < 0) {
+                    return fv;
+                }
+
+                if (value.length() < index + 1) {
+                    return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+                }
+
+                return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java
new file mode 100644
index 0000000..71af1b9
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringAfterLast.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringAfterLast extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment searchValuePath;
+
+    public SubstringAfterLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+        super("substringAfterLast", null, absolute);
+
+        this.recordPath = recordPath;
+        this.searchValuePath = searchValue;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+            .map(fv -> {
+                final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+                if (searchValue == null || searchValue.isEmpty()) {
+                    return fv;
+                }
+
+                final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+                final int index = value.lastIndexOf(searchValue);
+                if (index < 0) {
+                    return fv;
+                }
+
+                if (value.length() < index + 1) {
+                    return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
+                }
+
+                return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java
new file mode 100644
index 0000000..ddadc8e
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBefore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringBefore extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment searchValuePath;
+
+    public SubstringBefore(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+        super("substringBefore", null, absolute);
+
+        this.recordPath = recordPath;
+        this.searchValuePath = searchValue;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+            .map(fv -> {
+                final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+                if (searchValue == null || searchValue.isEmpty()) {
+                    return fv;
+                }
+
+                final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+                final int index = value.indexOf(searchValue);
+                if (index < 0) {
+                    return fv;
+                }
+
+                return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java
new file mode 100644
index 0000000..ed3aa5e
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/functions/SubstringBeforeLast.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.functions;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.record.path.util.RecordPathUtils;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class SubstringBeforeLast extends RecordPathSegment {
+
+    private final RecordPathSegment recordPath;
+    private final RecordPathSegment searchValuePath;
+
+    public SubstringBeforeLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
+        super("substringBeforeLast", null, absolute);
+
+        this.recordPath = recordPath;
+        this.searchValuePath = searchValue;
+    }
+
+    @Override
+    public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+        final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
+        return fieldValues.filter(fv -> fv.getValue() != null)
+            .map(fv -> {
+                final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
+                if (searchValue == null || searchValue.isEmpty()) {
+                    return fv;
+                }
+
+                final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
+                final int index = value.lastIndexOf(searchValue);
+                if (index < 0) {
+                    return fv;
+                }
+
+                return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null));
+            });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
index 759a848..7e87344 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
@@ -43,17 +43,12 @@ public class PredicatePath extends RecordPathSegment {
             context.setContextNode(fieldVal);
             try {
                 // Really what we want to do is filter out Stream<FieldValue> but that becomes very difficult
-                // to implement for the RecordPathFilter's. So, instead, we pass the FieldValue to field and
+                // to implement for the RecordPathFilter's. So, instead, we pass
                 // the RecordPathEvaluationContext and receive back a Stream<FieldValue>. Since this is a Predicate,
                 // though, we don't want to transform our Stream - we just want to filter it. So we handle this by
                 // mapping the result back to fieldVal. And since this predicate shouldn't return the same field multiple
-                // times, we will limit the stream to 1 element. We also filter out any FieldValue whose value is null.
-                // This is done because if we have a predicate like [./iDoNotExist != 'hello'] then the relative path will
-                // return a value of null and that will be compared to 'hello'. Since they are not equal, the NotEqualsFilter
-                // will return 'true', so we will get back a FieldValue with a null value. This should not make the Predicate
-                // true.
-                return filter.filter(fieldVal, context)
-                    .filter(fv -> fv.getValue() != null)
+                // times, we will limit the stream to 1 element.
+                return filter.filter(context, false)
                     .limit(1)
                     .map(ignore -> fieldVal);
             } finally {

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
index 24b872a..ef372ac 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -23,6 +23,7 @@ import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD;
 import static org.apache.nifi.record.path.RecordPathParser.DESCENDANT_REFERENCE;
 import static org.apache.nifi.record.path.RecordPathParser.EQUAL;
 import static org.apache.nifi.record.path.RecordPathParser.FIELD_NAME;
+import static org.apache.nifi.record.path.RecordPathParser.FUNCTION;
 import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN;
 import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN_EQUAL;
 import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN;
@@ -48,17 +49,38 @@ import java.util.function.BiFunction;
 import org.antlr.runtime.tree.Tree;
 import org.apache.nifi.record.path.NumericRange;
 import org.apache.nifi.record.path.exception.RecordPathException;
+import org.apache.nifi.record.path.filter.Contains;
+import org.apache.nifi.record.path.filter.ContainsRegex;
+import org.apache.nifi.record.path.filter.EndsWith;
 import org.apache.nifi.record.path.filter.EqualsFilter;
 import org.apache.nifi.record.path.filter.GreaterThanFilter;
 import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter;
+import org.apache.nifi.record.path.filter.IsBlank;
+import org.apache.nifi.record.path.filter.IsEmpty;
 import org.apache.nifi.record.path.filter.LessThanFilter;
 import org.apache.nifi.record.path.filter.LessThanOrEqualFilter;
+import org.apache.nifi.record.path.filter.MatchesRegex;
 import org.apache.nifi.record.path.filter.NotEqualsFilter;
+import org.apache.nifi.record.path.filter.NotFilter;
 import org.apache.nifi.record.path.filter.RecordPathFilter;
+import org.apache.nifi.record.path.filter.StartsWith;
+import org.apache.nifi.record.path.functions.Concat;
+import org.apache.nifi.record.path.functions.Replace;
+import org.apache.nifi.record.path.functions.ReplaceNull;
+import org.apache.nifi.record.path.functions.ReplaceRegex;
+import org.apache.nifi.record.path.functions.Substring;
+import org.apache.nifi.record.path.functions.SubstringAfter;
+import org.apache.nifi.record.path.functions.SubstringAfterLast;
+import org.apache.nifi.record.path.functions.SubstringBefore;
+import org.apache.nifi.record.path.functions.SubstringBeforeLast;
 
 public class RecordPathCompiler {
 
     public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) {
+        if (pathTree.getType() == FUNCTION) {
+            return buildPath(pathTree, null, absolute);
+        }
+
         RecordPathSegment parent = root;
         for (int i = 0; i < pathTree.getChildCount(); i++) {
             final Tree child = pathTree.getChild(i);
@@ -168,6 +190,59 @@ public class RecordPathCompiler {
             case PATH: {
                 return compile(tree, new RootPath(), absolute);
             }
+            case FUNCTION: {
+                final String functionName = tree.getChild(0).getText();
+                final Tree argumentListTree = tree.getChild(1);
+
+                switch (functionName) {
+                    case "substring": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
+                        return new Substring(args[0], args[1], args[2], absolute);
+                    }
+                    case "substringAfter": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new SubstringAfter(args[0], args[1], absolute);
+                    }
+                    case "substringAfterLast": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new SubstringAfterLast(args[0], args[1], absolute);
+                    }
+                    case "substringBefore": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new SubstringBefore(args[0], args[1], absolute);
+                    }
+                    case "substringBeforeLast": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new SubstringBeforeLast(args[0], args[1], absolute);
+                    }
+                    case "replace": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
+                        return new Replace(args[0], args[1], args[2], absolute);
+                    }
+                    case "replaceRegex": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
+                        return new ReplaceRegex(args[0], args[1], args[2], absolute);
+                    }
+                    case "replaceNull": {
+                        final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                        return new ReplaceNull(args[0], args[1], absolute);
+                    }
+                    case "concat": {
+                        final int numArgs = argumentListTree.getChildCount();
+
+                        final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
+                        for (int i = 0; i < numArgs; i++) {
+                            argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
+                        }
+
+                        return new Concat(argPaths, absolute);
+                    }
+                    default: {
+                        throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only "
+                            + "be used within a predicate, not as a standalone function");
+                    }
+                }
+            }
         }
 
         throw new RecordPathException("Encountered unexpected token " + tree);
@@ -187,6 +262,8 @@ public class RecordPathCompiler {
                 return createBinaryOperationFilter(operatorTree, parent, GreaterThanFilter::new, absolute);
             case GREATER_THAN_EQUAL:
                 return createBinaryOperationFilter(operatorTree, parent, GreaterThanOrEqualFilter::new, absolute);
+            case FUNCTION:
+                return createFunctionFilter(operatorTree, absolute);
             default:
                 throw new RecordPathException("Expected an Expression of form <value> <operator> <value> to follow '[' Token but found " + operatorTree);
         }
@@ -200,4 +277,66 @@ public class RecordPathCompiler {
         final RecordPathSegment rhsPath = buildPath(rhsTree, parent, absolute);
         return function.apply(lhsPath, rhsPath);
     }
+
+    private static RecordPathFilter createFunctionFilter(final Tree functionTree, final boolean absolute) {
+        final String functionName = functionTree.getChild(0).getText();
+        final Tree argumentListTree = functionTree.getChild(1);
+
+        switch (functionName) {
+            case "contains": {
+                final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                return new Contains(args[0], args[1]);
+            }
+            case "matchesRegex": {
+                final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                return new MatchesRegex(args[0], args[1]);
+            }
+            case "containsRegex": {
+                final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                return new ContainsRegex(args[0], args[1]);
+            }
+            case "startsWith": {
+                final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                return new StartsWith(args[0], args[1]);
+            }
+            case "endsWith": {
+                final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
+                return new EndsWith(args[0], args[1]);
+            }
+            case "isEmpty": {
+                final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
+                return new IsEmpty(args[0]);
+            }
+            case "isBlank": {
+                final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
+                return new IsBlank(args[0]);
+            }
+            case "not": {
+                final int numArgs = argumentListTree.getChildCount();
+                if (numArgs != 1) {
+                    throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes 1 argument but got " + numArgs);
+                }
+
+                final Tree childTree = argumentListTree.getChild(0);
+                final RecordPathFilter childFilter = createFilter(childTree, null, absolute);
+                return new NotFilter(childFilter);
+            }
+        }
+
+        throw new RecordPathException("Invalid function name: " + functionName);
+    }
+
+    private static RecordPathSegment[] getArgPaths(final Tree argumentListTree, final int expectedCount, final String functionName, final boolean absolute) {
+        final int numArgs = argumentListTree.getChildCount();
+        if (numArgs != expectedCount) {
+            throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes " + expectedCount + " arguments but got " + numArgs);
+        }
+
+        final RecordPathSegment[] argPaths = new RecordPathSegment[expectedCount];
+        for (int i = 0; i < expectedCount; i++) {
+            argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
+        }
+
+        return argPaths;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
index 92ff010..845ca84 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
@@ -25,7 +25,6 @@ import org.apache.nifi.record.path.RecordPath;
 import org.apache.nifi.record.path.RecordPathEvaluationContext;
 import org.apache.nifi.record.path.RecordPathResult;
 import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
-import org.apache.nifi.record.path.util.Filters;
 import org.apache.nifi.serialization.record.Record;
 
 public abstract class RecordPathSegment implements RecordPath {
@@ -33,7 +32,7 @@ public abstract class RecordPathSegment implements RecordPath {
     private final RecordPathSegment parentPath;
     private final boolean absolute;
 
-    RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) {
+    public RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) {
         this.path = path;
         this.parentPath = parentPath;
         this.absolute = absolute;
@@ -98,34 +97,8 @@ public abstract class RecordPathSegment implements RecordPath {
     }
 
     @Override
-    public final RecordPathResult evaluate(final FieldValue contextNode) {
-        final RecordPathEvaluationContext context;
-        if (Filters.isRecord(contextNode.getField().getDataType(), contextNode.getValue())) {
-            final Record record = (Record) contextNode.getValue();
-            if (record == null) {
-                return new RecordPathResult() {
-                    @Override
-                    public String getPath() {
-                        return RecordPathSegment.this.getPath();
-                    }
-
-                    @Override
-                    public Stream<FieldValue> getSelectedFields() {
-                        return Stream.empty();
-                    }
-                };
-            }
-
-            context = new StandardRecordPathEvaluationContext(record);
-        } else {
-            final FieldValue parent = contextNode.getParent().orElse(null);
-            if (parent == null) {
-                context = new StandardRecordPathEvaluationContext(null);
-            } else {
-                context = new StandardRecordPathEvaluationContext(parent.getParentRecord().orElse(null));
-            }
-        }
-
+    public final RecordPathResult evaluate(final Record record, final FieldValue contextNode) {
+        final RecordPathEvaluationContext context = new StandardRecordPathEvaluationContext(record);
         context.setContextNode(contextNode);
         final Stream<FieldValue> selectedFields = evaluate(context);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/32314d70/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java
new file mode 100644
index 0000000..e8056e4
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.util;
+
+import java.util.Optional;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.paths.RecordPathSegment;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+public class RecordPathUtils {
+
+    public static String getFirstStringValue(final RecordPathSegment segment, final RecordPathEvaluationContext context) {
+        final Optional<FieldValue> stringFieldValue = segment.evaluate(context).findFirst();
+        if (!stringFieldValue.isPresent()) {
+            return null;
+        }
+
+        final String stringValue = DataTypeUtils.toString(stringFieldValue.get().getValue(), (String) null);
+        if (stringValue == null) {
+            return null;
+        }
+
+        return stringValue;
+    }
+}