You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/05/12 16:49:12 UTC
[4/5] nifi git commit: NIFI-3838: Initial implementation of
RecordPath and UpdateRecord processor
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java
new file mode 100644
index 0000000..9966c3d
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.ArrayIndexFieldValue;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.NumericRange;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+
+public class MultiArrayIndexPath extends RecordPathSegment {
+ private final List<NumericRange> indices;
+
+ MultiArrayIndexPath(final List<NumericRange> indices, final RecordPathSegment parent, final boolean absolute) {
+ super(indices.toString(), parent, absolute);
+ this.indices = indices;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> parentResult = getParentPath().evaluate(context);
+
+ return parentResult
+ .filter(Filters.fieldTypeFilter(RecordFieldType.ARRAY))
+ .flatMap(fieldValue -> {
+ final ArrayDataType arrayDataType = (ArrayDataType) fieldValue.getField().getDataType();
+ final DataType elementDataType = arrayDataType.getElementType();
+ final RecordField arrayField = new RecordField(fieldValue.getField().getFieldName(), elementDataType);
+
+ final Object[] values = (Object[]) fieldValue.getValue();
+
+ return indices.stream()
+ .filter(range -> values.length > Math.abs(range.getMin()))
+ .flatMap(range -> {
+ final List<Object> valuesWithinRange = new ArrayList<>();
+
+ final int min = range.getMin() < 0 ? values.length + range.getMin() : range.getMin();
+ final int max = range.getMax() < 0 ? values.length + range.getMax() : range.getMax();
+
+ for (int i = min; i <= max; i++) {
+ if (values.length > i) {
+ valuesWithinRange.add(values[i]);
+ }
+ }
+
+ return IntStream.range(0, valuesWithinRange.size())
+ .mapToObj(index -> {
+ final RecordField elementField = new RecordField(arrayField.getFieldName() + "[" + index + "]", elementDataType);
+ return new ArrayIndexFieldValue(valuesWithinRange.get(index), elementField, fieldValue, index);
+ });
+ });
+
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java
new file mode 100644
index 0000000..e9b1874
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.MapEntryFieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+
+public class MultiMapKeyPath extends RecordPathSegment {
+ private final List<String> mapKeys;
+
+ MultiMapKeyPath(final List<String> mapKeys, final RecordPathSegment parent, final boolean absolute) {
+ super(mapKeys.toString(), parent, absolute);
+ this.mapKeys = mapKeys;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> parentResult = getParentPath().evaluate(context);
+
+ return parentResult
+ .filter(Filters.fieldTypeFilter(RecordFieldType.MAP))
+ .flatMap(fieldValue -> {
+ final Map<String, ?> map = (Map<String, ?>) fieldValue.getValue();
+ return mapKeys.stream().map(key -> {
+ final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType();
+ final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + key + "']", valueType);
+ return new MapEntryFieldValue(map.get(key), elementField, fieldValue, key);
+ });
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java
new file mode 100644
index 0000000..bd5bf97
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.util.Filters;
+
+
+public class ParentPath extends RecordPathSegment {
+
+ ParentPath(final RecordPathSegment parent, final boolean absolute) {
+ super("..", parent, absolute);
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> stream;
+ final RecordPathSegment parentPath = getParentPath();
+ if (parentPath == null) {
+ stream = Stream.of(context.getContextNode());
+ } else {
+ stream = parentPath.evaluate(context);
+ }
+
+ return Filters.presentValues(stream.map(fieldVal -> fieldVal.getParent()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
new file mode 100644
index 0000000..759a848
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.filter.RecordPathFilter;
+
+public class PredicatePath extends RecordPathSegment {
+ private final RecordPathFilter filter;
+
+ public PredicatePath(final RecordPathSegment parent, final RecordPathFilter filter, final boolean absolute) {
+ super("[" + filter + "]", parent, absolute);
+ this.filter = filter;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> valueStream = getParentPath().evaluate(context);
+
+ return valueStream.flatMap(fieldVal -> {
+ // For the duration of this Predicate, we want to consider the 'context node' to be
+ // whatever value is given to us in the field value. We then want to return the 'context node'
+ // back to what it was before this Predicate.
+ final FieldValue previousContextNode = context.getContextNode();
+ context.setContextNode(fieldVal);
+ try {
+ // Really what we want to do is filter out Stream<FieldValue> but that becomes very difficult
+ // to implement for the RecordPathFilter's. So, instead, we pass the FieldValue to field and
+ // the RecordPathEvaluationContext and receive back a Stream<FieldValue>. Since this is a Predicate,
+ // though, we don't want to transform our Stream - we just want to filter it. So we handle this by
+ // mapping the result back to fieldVal. And since this predicate shouldn't return the same field multiple
+ // times, we will limit the stream to 1 element. We also filter out any FieldValue whose value is null.
+ // This is done because if we have a predicate like [./iDoNotExist != 'hello'] then the relative path will
+ // return a value of null and that will be compared to 'hello'. Since they are not equal, the NotEqualsFilter
+ // will return 'true', so we will get back a FieldValue with a null value. This should not make the Predicate
+ // true.
+ return filter.filter(fieldVal, context)
+ .filter(fv -> fv.getValue() != null)
+ .limit(1)
+ .map(ignore -> fieldVal);
+ } finally {
+ context.setContextNode(previousContextNode);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
new file mode 100644
index 0000000..24b872a
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import static org.apache.nifi.record.path.RecordPathParser.ARRAY_INDEX;
+import static org.apache.nifi.record.path.RecordPathParser.CHILD_REFERENCE;
+import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD;
+import static org.apache.nifi.record.path.RecordPathParser.DESCENDANT_REFERENCE;
+import static org.apache.nifi.record.path.RecordPathParser.EQUAL;
+import static org.apache.nifi.record.path.RecordPathParser.FIELD_NAME;
+import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN;
+import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN_EQUAL;
+import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN;
+import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN_EQUAL;
+import static org.apache.nifi.record.path.RecordPathParser.MAP_KEY;
+import static org.apache.nifi.record.path.RecordPathParser.NOT_EQUAL;
+import static org.apache.nifi.record.path.RecordPathParser.NUMBER;
+import static org.apache.nifi.record.path.RecordPathParser.NUMBER_LIST;
+import static org.apache.nifi.record.path.RecordPathParser.NUMBER_RANGE;
+import static org.apache.nifi.record.path.RecordPathParser.PARENT_REFERENCE;
+import static org.apache.nifi.record.path.RecordPathParser.PATH;
+import static org.apache.nifi.record.path.RecordPathParser.PREDICATE;
+import static org.apache.nifi.record.path.RecordPathParser.RELATIVE_PATH;
+import static org.apache.nifi.record.path.RecordPathParser.ROOT_REFERENCE;
+import static org.apache.nifi.record.path.RecordPathParser.STRING_LIST;
+import static org.apache.nifi.record.path.RecordPathParser.STRING_LITERAL;
+import static org.apache.nifi.record.path.RecordPathParser.WILDCARD;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiFunction;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.nifi.record.path.NumericRange;
+import org.apache.nifi.record.path.exception.RecordPathException;
+import org.apache.nifi.record.path.filter.EqualsFilter;
+import org.apache.nifi.record.path.filter.GreaterThanFilter;
+import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter;
+import org.apache.nifi.record.path.filter.LessThanFilter;
+import org.apache.nifi.record.path.filter.LessThanOrEqualFilter;
+import org.apache.nifi.record.path.filter.NotEqualsFilter;
+import org.apache.nifi.record.path.filter.RecordPathFilter;
+
+public class RecordPathCompiler {
+
+ public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) {
+ RecordPathSegment parent = root;
+ for (int i = 0; i < pathTree.getChildCount(); i++) {
+ final Tree child = pathTree.getChild(i);
+ parent = RecordPathCompiler.buildPath(child, parent, absolute);
+ }
+
+ return parent;
+ }
+
+ public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegment parent, final boolean absolute) {
+ switch (tree.getType()) {
+ case ROOT_REFERENCE: {
+ return new RootPath();
+ }
+ case CHILD_REFERENCE: {
+ final Tree childTree = tree.getChild(0);
+ final int childTreeType = childTree.getType();
+ if (childTreeType == FIELD_NAME) {
+ final String childName = childTree.getChild(0).getText();
+ return new ChildFieldPath(childName, parent, absolute);
+ } else if (childTreeType == WILDCARD) {
+ return new WildcardChildPath(parent, absolute);
+ } else {
+ throw new RecordPathException("Expected field name following '/' Token but found " + childTree);
+ }
+ }
+ case ARRAY_INDEX: {
+ final Tree indexListTree = tree.getChild(0);
+ if (indexListTree.getType() == NUMBER_LIST) {
+ if (indexListTree.getChildCount() == 1 && indexListTree.getChild(0).getType() == NUMBER) {
+ final Tree indexTree = indexListTree.getChild(0);
+ final int index = Integer.parseInt(indexTree.getText());
+ return new ArrayIndexPath(index, parent, absolute);
+ }
+
+ final List<NumericRange> indexList = new ArrayList<>();
+
+ for (int i = 0; i < indexListTree.getChildCount(); i++) {
+ final Tree indexTree = indexListTree.getChild(i);
+ if (indexTree.getType() == NUMBER) {
+ final int index = Integer.valueOf(indexTree.getText());
+ indexList.add(new NumericRange(index, index));
+ } else if (indexTree.getType() == NUMBER_RANGE) {
+ final int min = Integer.valueOf(indexTree.getChild(0).getText());
+ final int max = Integer.valueOf(indexTree.getChild(1).getText());
+ indexList.add(new NumericRange(min, max));
+ } else {
+ throw new RecordPathException("Expected Number or Range following '[' Token but found " + indexTree);
+ }
+ }
+
+ return new MultiArrayIndexPath(indexList, parent, absolute);
+ } else {
+ throw new RecordPathException("Expected Number or Range following '[' Token but found " + indexListTree);
+ }
+ }
+ case MAP_KEY: {
+ final Tree keyTree = tree.getChild(0);
+ if (keyTree.getType() == STRING_LIST) {
+ if (keyTree.getChildCount() == 1) {
+ return new SingularMapKeyPath(keyTree.getChild(0).getText(), parent, absolute);
+ }
+
+ final List<String> keys = new ArrayList<>(keyTree.getChildCount());
+ for (int i = 0; i < keyTree.getChildCount(); i++) {
+ keys.add(keyTree.getChild(i).getText());
+ }
+
+ return new MultiMapKeyPath(keys, parent, absolute);
+ } else {
+ throw new RecordPathException("Expected Map Key following '[' Token but found " + keyTree);
+ }
+ }
+ case WILDCARD: {
+ return new WildcardIndexPath(parent, absolute);
+ }
+ case DESCENDANT_REFERENCE: {
+ final Tree childTree = tree.getChild(0);
+ final int childTreeType = childTree.getType();
+ if (childTreeType == FIELD_NAME) {
+ final String descendantName = childTree.getChild(0).getText();
+ return new DescendantFieldPath(descendantName, parent, absolute);
+ } else {
+ throw new RecordPathException("Expected field name following '//' Token but found " + childTree);
+ }
+ }
+ case PARENT_REFERENCE: {
+ return new ParentPath(parent, absolute);
+ }
+ case CURRENT_FIELD: {
+ return new CurrentFieldPath(parent, absolute);
+ }
+ case STRING_LITERAL: {
+ return new LiteralValuePath(parent, tree.getText(), absolute);
+ }
+ case NUMBER: {
+ return new LiteralValuePath(parent, Integer.parseInt(tree.getText()), absolute);
+ }
+ case PREDICATE: {
+ final Tree operatorTree = tree.getChild(0);
+ final RecordPathFilter filter = createFilter(operatorTree, parent, absolute);
+ return new PredicatePath(parent, filter, absolute);
+ }
+ case RELATIVE_PATH: {
+ return compile(tree, parent, absolute);
+ }
+ case PATH: {
+ return compile(tree, new RootPath(), absolute);
+ }
+ }
+
+ throw new RecordPathException("Encountered unexpected token " + tree);
+ }
+
+ private static RecordPathFilter createFilter(final Tree operatorTree, final RecordPathSegment parent, final boolean absolute) {
+ switch (operatorTree.getType()) {
+ case EQUAL:
+ return createBinaryOperationFilter(operatorTree, parent, EqualsFilter::new, absolute);
+ case NOT_EQUAL:
+ return createBinaryOperationFilter(operatorTree, parent, NotEqualsFilter::new, absolute);
+ case LESS_THAN:
+ return createBinaryOperationFilter(operatorTree, parent, LessThanFilter::new, absolute);
+ case LESS_THAN_EQUAL:
+ return createBinaryOperationFilter(operatorTree, parent, LessThanOrEqualFilter::new, absolute);
+ case GREATER_THAN:
+ return createBinaryOperationFilter(operatorTree, parent, GreaterThanFilter::new, absolute);
+ case GREATER_THAN_EQUAL:
+ return createBinaryOperationFilter(operatorTree, parent, GreaterThanOrEqualFilter::new, absolute);
+ default:
+ throw new RecordPathException("Expected an Expression of form <value> <operator> <value> to follow '[' Token but found " + operatorTree);
+ }
+ }
+
+ private static RecordPathFilter createBinaryOperationFilter(final Tree operatorTree, final RecordPathSegment parent,
+ final BiFunction<RecordPathSegment, RecordPathSegment, RecordPathFilter> function, final boolean absolute) {
+ final Tree lhsTree = operatorTree.getChild(0);
+ final Tree rhsTree = operatorTree.getChild(1);
+ final RecordPathSegment lhsPath = buildPath(lhsTree, parent, absolute);
+ final RecordPathSegment rhsPath = buildPath(rhsTree, parent, absolute);
+ return function.apply(lhsPath, rhsPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
new file mode 100644
index 0000000..92ff010
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.Record;
+
+public abstract class RecordPathSegment implements RecordPath {
+ private final String path;
+ private final RecordPathSegment parentPath;
+ private final boolean absolute;
+
+ RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) {
+ this.path = path;
+ this.parentPath = parentPath;
+ this.absolute = absolute;
+ }
+
+ @Override
+ public String getPath() {
+ return path;
+ }
+
+ RecordPathSegment getParentPath() {
+ return parentPath;
+ }
+
+ @Override
+ public String toString() {
+ return getPath();
+ }
+
+ @Override
+ public boolean isAbsolute() {
+ return absolute;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordPath)) {
+ return false;
+ }
+
+ final RecordPath other = (RecordPath) obj;
+ return getPath().equals(other.getPath());
+ }
+
+ @Override
+ public final RecordPathResult evaluate(final Record record) {
+ final RecordPathEvaluationContext context = new StandardRecordPathEvaluationContext(record);
+ final Stream<FieldValue> selectedFields = evaluate(context);
+
+ return new RecordPathResult() {
+ @Override
+ public String getPath() {
+ return RecordPathSegment.this.getPath();
+ }
+
+ @Override
+ public Stream<FieldValue> getSelectedFields() {
+ return selectedFields;
+ }
+ };
+ }
+
+ @Override
+ public final RecordPathResult evaluate(final FieldValue contextNode) {
+ final RecordPathEvaluationContext context;
+ if (Filters.isRecord(contextNode.getField().getDataType(), contextNode.getValue())) {
+ final Record record = (Record) contextNode.getValue();
+ if (record == null) {
+ return new RecordPathResult() {
+ @Override
+ public String getPath() {
+ return RecordPathSegment.this.getPath();
+ }
+
+ @Override
+ public Stream<FieldValue> getSelectedFields() {
+ return Stream.empty();
+ }
+ };
+ }
+
+ context = new StandardRecordPathEvaluationContext(record);
+ } else {
+ final FieldValue parent = contextNode.getParent().orElse(null);
+ if (parent == null) {
+ context = new StandardRecordPathEvaluationContext(null);
+ } else {
+ context = new StandardRecordPathEvaluationContext(parent.getParentRecord().orElse(null));
+ }
+ }
+
+ context.setContextNode(contextNode);
+ final Stream<FieldValue> selectedFields = evaluate(context);
+
+ return new RecordPathResult() {
+ @Override
+ public String getPath() {
+ return RecordPathSegment.this.getPath();
+ }
+
+ @Override
+ public Stream<FieldValue> getSelectedFields() {
+ return selectedFields;
+ }
+ };
+ }
+
+ public abstract Stream<FieldValue> evaluate(RecordPathEvaluationContext context);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java
new file mode 100644
index 0000000..6ec205e
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class RootPath extends RecordPathSegment {
+ private static final String PATH = "";
+
+ public RootPath() {
+ super(PATH, null, true);
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final RecordField field = new RecordField("root", RecordFieldType.RECORD.getRecordDataType(context.getRecord().getSchema()));
+ final FieldValue fieldValue = new StandardFieldValue(context.getRecord(), field, null);
+ return Stream.of(fieldValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java
new file mode 100644
index 0000000..ee57f36
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.MapEntryFieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+
+public class SingularMapKeyPath extends RecordPathSegment {
+ private final String mapKey;
+
+ SingularMapKeyPath(final String mapKey, final RecordPathSegment parent, final boolean absolute) {
+ super("[" + mapKey + "]", parent, absolute);
+ this.mapKey = mapKey;
+ }
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> parentResult = getParentPath().evaluate(context);
+
+ return parentResult
+ .filter(Filters.fieldTypeFilter(RecordFieldType.MAP))
+ .map(fieldValue -> {
+ final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType();
+ final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + mapKey + "']", valueType);
+ return new MapEntryFieldValue(getMapValue(fieldValue), elementField, fieldValue, mapKey);
+ });
+ }
+
+ private Object getMapValue(final FieldValue fieldValue) {
+ final Map<?, ?> map = (Map<?, ?>) fieldValue.getValue();
+ return map.get(mapKey);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java
new file mode 100644
index 0000000..16513f9
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.StandardFieldValue;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.Record;
+
+public class WildcardChildPath extends RecordPathSegment {
+
+ WildcardChildPath(final RecordPathSegment parent, final boolean absolute) {
+ super("/*", parent, absolute);
+ }
+
+
+ @Override
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ return getParentPath().evaluate(context)
+ // map to Optional<FieldValue> containing child element
+ .flatMap(fieldVal -> getChildren(fieldVal));
+ }
+
+ private Stream<FieldValue> getChildren(final FieldValue fieldValue) {
+ if (fieldValue == null || fieldValue.getValue() == null || !Filters.isRecord(fieldValue)) {
+ return Stream.empty();
+ }
+
+ final Record record = (Record) fieldValue.getValue();
+ return Filters.presentValues(record.getSchema().getFields().stream()
+ .map(field -> {
+ final Object value = record.getValue(field);
+ if (value == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new StandardFieldValue(value, field, fieldValue));
+ }));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java
new file mode 100644
index 0000000..c2ce474
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.paths;
+
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.ArrayIndexFieldValue;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.MapEntryFieldValue;
+import org.apache.nifi.record.path.RecordPathEvaluationContext;
+import org.apache.nifi.record.path.util.Filters;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+
+public class WildcardIndexPath extends RecordPathSegment {
+
+ WildcardIndexPath(final RecordPathSegment parent, final boolean absolute) {
+ super("[*]", parent, absolute);
+ }
+
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
+ final Stream<FieldValue> parentResult = getParentPath().evaluate(context);
+
+ return parentResult
+ .filter(Filters.fieldTypeFilter(RecordFieldType.MAP, RecordFieldType.ARRAY))
+ .flatMap(fieldValue -> {
+ final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
+
+ final Object value = fieldValue.getValue();
+ if (value == null) {
+ return Stream.empty();
+ }
+
+ if (fieldType == RecordFieldType.MAP) {
+ final Map<String, ?> map = (Map<String, ?>) value;
+ return map.entrySet().stream()
+ .map(entry -> {
+ final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType();
+ final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + entry.getKey() + "']", valueType);
+ return new MapEntryFieldValue(entry.getValue(), elementField, fieldValue, entry.getKey());
+ });
+ } else {
+ final Object[] array = (Object[]) value;
+ return IntStream.range(0, array.length)
+ .mapToObj(index -> {
+ final DataType elementDataType = ((ArrayDataType) fieldValue.getField().getDataType()).getElementType();
+ final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "[" + index + "]", elementDataType);
+ return new ArrayIndexFieldValue(array[index], elementField, fieldValue, index);
+ });
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java
new file mode 100644
index 0000000..4800d59
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.util;
+
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class Filters {
+
+ public static Predicate<FieldValue> fieldTypeFilter(final RecordFieldType fieldType, final RecordFieldType... alternativeTypes) {
+ return fieldVal -> {
+ final RecordFieldType recordFieldType = fieldVal.getField().getDataType().getFieldType();
+ if (recordFieldType == fieldType) {
+ return true;
+ }
+
+ for (final RecordFieldType alternate : alternativeTypes) {
+ if (recordFieldType == alternate) {
+ return true;
+ }
+ }
+
+ return false;
+ };
+ }
+
+ public static <T> Stream<T> presentValues(final Stream<Optional<T>> stream) {
+ return stream.filter(opt -> opt.isPresent())
+ .map(opt -> opt.get());
+ }
+
+ public static boolean isRecord(final FieldValue fieldValue) {
+ final DataType dataType = fieldValue.getField().getDataType();
+ final Object value = fieldValue.getValue();
+ return isRecord(dataType, value);
+ }
+
+ public static boolean isRecord(final DataType dataType, final Object value) {
+ if (dataType.getFieldType() == RecordFieldType.RECORD) {
+ return true;
+ }
+
+ if (value == null) {
+ return false;
+ }
+
+ if (value instanceof Record) {
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
new file mode 100644
index 0000000..243ad11
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.nifi.record.path.RecordPath;
+
+public class RecordPathCache {
+ private final Map<String, RecordPath> compiledRecordPaths;
+
+ public RecordPathCache(final int cacheSize) {
+ compiledRecordPaths = new LinkedHashMap<String, RecordPath>() {
+ @Override
+ protected boolean removeEldestEntry(final Map.Entry<String, RecordPath> eldest) {
+ return size() >= cacheSize;
+ }
+ };
+ }
+
+ public RecordPath getCompiled(final String path) {
+ RecordPath compiled;
+ synchronized (this) {
+ compiled = compiledRecordPaths.get(path);
+ }
+
+ if (compiled != null) {
+ return compiled;
+ }
+
+ compiled = RecordPath.compile(path);
+
+ synchronized (this) {
+ final RecordPath existing = compiledRecordPaths.putIfAbsent(path, compiled);
+ if (existing != null) {
+ compiled = existing;
+ }
+ }
+
+ return compiled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java
new file mode 100644
index 0000000..0a53335
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.validation;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.exception.RecordPathException;
+
+public class RecordPathPropertyNameValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ try {
+ RecordPath.compile(subject);
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .explanation("Valid RecordPath")
+ .build();
+ } catch (final RecordPathException e) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Property Name is not a valid RecordPath value: " + e.getMessage())
+ .build();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java
new file mode 100644
index 0000000..ef5e599
--- /dev/null
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.record.path.validation;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.exception.RecordPathException;
+
+public class RecordPathValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .explanation("Property uses Expression Language so no further validation is possible")
+ .build();
+ }
+
+ try {
+ RecordPath.compile(input);
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .explanation("Valid RecordPath")
+ .build();
+ } catch (final RecordPathException e) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(false)
+ .explanation("Property Value is not a valid RecordPath value: " + e.getMessage())
+ .build();
+ }
+ }
+
+}