You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 23:21:40 UTC
[05/20] incubator-nifi git commit: NIFI-506: Initial import of HL7
work
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
new file mode 100644
index 0000000..e4a0d53
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hl7;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import ca.uhn.hl7v2.DefaultHapiContext;
+import ca.uhn.hl7v2.HL7Exception;
+import ca.uhn.hl7v2.HapiContext;
+import ca.uhn.hl7v2.model.Composite;
+import ca.uhn.hl7v2.model.Group;
+import ca.uhn.hl7v2.model.Message;
+import ca.uhn.hl7v2.model.Primitive;
+import ca.uhn.hl7v2.model.Segment;
+import ca.uhn.hl7v2.model.Structure;
+import ca.uhn.hl7v2.model.Type;
+import ca.uhn.hl7v2.model.Varies;
+import ca.uhn.hl7v2.parser.PipeParser;
+import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
+
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"HL7", "health level 7", "healthcare", "extract", "attributes"})
+@CapabilityDescription("Extracts information from an HL7 (Health Level 7) formatted FlowFile and adds the information as FlowFile Attributes. "
+ + "The attributes are named as <Segment Name> <dot> <Field Index>. If the segment is repeating, the naming will be "
+ + "<Segment Name> <underscore> <Segment Index> <dot> <Field Index>. For example, we may have an attribute named \"MHS.12\" with "
+ + "a value of \"2.1\" and an attribute named \"OBX_11.3\" with a value of \"93000^CPT4\".")
+public class ExtractHL7Attributes extends AbstractProcessor {
+ public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+ .name("Character Encoding")
+ .description("The Character Encoding that is used to encode the HL7 data")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .defaultValue("UTF-8")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship if it is properly parsed as HL7 and its attributes extracted")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot be mapped to FlowFile Attributes. This would happen if the FlowFile does not contain valid HL7 data")
+ .build();
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(CHARACTER_SET);
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue());
+
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer);
+ }
+ });
+
+ @SuppressWarnings("resource")
+ final HapiContext hapiContext = new DefaultHapiContext();
+ hapiContext.setValidationContext(ValidationContextFactory.noValidation());
+
+ final PipeParser parser = hapiContext.getPipeParser();
+ final String hl7Text = new String(buffer, charset);
+ final Message message;
+ try {
+ message = parser.parse(hl7Text);
+ final Group group = message.getParent();
+
+ final Map<String, String> attributes = new HashMap<>();
+ extractAttributes(group, attributes);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ getLogger().info("Successfully extracted {} attributes for {}; routing to success", new Object[] {attributes.size(), flowFile});
+ getLogger().debug("Added the following attributes for {}: {}", new Object[] {flowFile, attributes});
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final HL7Exception e) {
+ getLogger().error("Failed to extract attributes from {} due to {}", new Object[] {flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
+ private void extractAttributes(final Group group, final Map<String, String> attributes) throws HL7Exception {
+ extractAttributes(group, attributes, new HashMap<String, Integer>());
+ }
+
+ private void extractAttributes(final Group group, final Map<String, String> attributes, final Map<String, Integer> segmentCounts) throws HL7Exception {
+ if ( group.isEmpty() ) {
+ return;
+ }
+
+ final String[] structureNames = group.getNames();
+ for ( final String structName : structureNames ) {
+ final Structure[] subStructures = group.getAll(structName);
+
+ if ( group.isGroup(structName) ) {
+ for ( final Structure subStructure : subStructures ) {
+ final Group subGroup = (Group) subStructure;
+ extractAttributes(subGroup, attributes, segmentCounts);
+ }
+ } else {
+ for ( final Structure structure : subStructures ) {
+ final Segment segment = (Segment) structure ;
+
+ final String segmentName = segment.getName();
+ Integer segmentNum = segmentCounts.get(segmentName);
+ if (segmentNum == null) {
+ segmentNum = 1;
+ segmentCounts.put(segmentName, 1);
+ } else {
+ segmentNum++;
+ segmentCounts.put(segmentName, segmentNum);
+ }
+
+ final boolean segmentRepeating = segment.getParent().isRepeating(segment.getName());
+ final boolean parentRepeating = (segment.getParent().getParent() != segment.getParent() && segment.getParent().getParent().isRepeating(segment.getParent().getName()));
+ final boolean useSegmentIndex = segmentRepeating || parentRepeating;
+
+ final Map<String, String> attributeMap = getAttributes(segment, useSegmentIndex ? segmentNum : null);
+ attributes.putAll(attributeMap);
+ }
+ }
+ }
+ }
+
+
+ private Map<String, String> getAttributes(final Segment segment, final Integer segmentNum) throws HL7Exception {
+ final Map<String, String> attributes = new HashMap<>();
+
+ for (int i=1; i <= segment.numFields(); i++) {
+ final String fieldName = segment.getName() + (segmentNum == null ? "" : "_" + segmentNum) + "." + i;
+ final Type[] types = segment.getField(i);
+ final StringBuilder sb = new StringBuilder();
+ for ( final Type type : types ) {
+ final String typeValue = getValue(type);
+ if ( !typeValue.isEmpty() ) {
+ sb.append(typeValue).append("^");
+ }
+ }
+
+ if ( sb.length() == 0 ) {
+ continue;
+ }
+ String typeVal = sb.toString();
+ if ( typeVal.endsWith("^") ) {
+ typeVal = typeVal.substring(0, typeVal.length() - 1);
+ }
+
+ attributes.put(fieldName, typeVal);
+ }
+
+ return attributes;
+ }
+
+
+ private String getValue(final Type type) {
+ if ( type == null ) {
+ return "";
+ }
+
+ if ( type instanceof Primitive ) {
+ final String value = ((Primitive) type).getValue();
+ return value == null ? "" : value;
+ } else if ( type instanceof Composite ) {
+ final StringBuilder sb = new StringBuilder();
+ final Composite composite = (Composite) type;
+ for ( final Type component : composite.getComponents() ) {
+ final String componentValue = getValue(component);
+ if ( !componentValue.isEmpty() ) {
+ sb.append(componentValue).append("^");
+ }
+ }
+
+ final String value = sb.toString();
+ if ( value.endsWith("^") ) {
+ return value.substring(0, value.length() - 1);
+ }
+
+ return value;
+ } else if ( type instanceof Varies ) {
+ final Varies varies = (Varies) type;
+ return getValue(varies.getData());
+ }
+
+ return "";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
new file mode 100644
index 0000000..c8c4176
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hl7;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hl7.hapi.HapiMessage;
+import org.apache.nifi.hl7.model.HL7Message;
+import org.apache.nifi.hl7.query.HL7Query;
+import org.apache.nifi.hl7.query.QueryResult;
+import org.apache.nifi.hl7.query.exception.HL7QueryParsingException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import ca.uhn.hl7v2.DefaultHapiContext;
+import ca.uhn.hl7v2.HapiContext;
+import ca.uhn.hl7v2.model.Message;
+import ca.uhn.hl7v2.parser.PipeParser;
+import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"HL7", "healthcare", "route", "Health Level 7"})
+@DynamicProperties({@DynamicProperty(name="Name of a Relationship", value="An HL7 Query Language query", description="If a FlowFile matches the query, it will be routed to a relationship with the name of the property")})
+@WritesAttributes({@WritesAttribute(attribute="RouteHL7.Route", description="The name of the relationship to which the FlowFile was routed")})
+@CapabilityDescription("Routes incoming HL7 data according to user-defined queries. To add a query, add a new property to the processor."
+ + " The name of the property will become a new relationship for the processor, and the value is an HL7 Query Language query. If"
+ + " a FlowFile matches the query, a copy of the FlowFile will be routed to the associated relationship.")
+public class RouteHL7 extends AbstractProcessor {
+ public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+ .name("Character Encoding")
+ .description("The Character Encoding that is used to encode the HL7 data")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .defaultValue("UTF-8")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that cannot be parsed as HL7 will be routed to this relationship")
+ .build();
+ static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that comes into this processor will be routed to this relationship, unless it is routed to 'failure'")
+ .build();
+
+ private volatile Map<Relationship, HL7Query> queries = new HashMap<>();
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("Specifies a query that will cause any HL7 message matching the query to be routed to the '" + propertyDescriptorName + "' relationship")
+ .required(false)
+ .dynamic(true)
+ .addValidator(new HL7QueryValidator())
+ .build();
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(CHARACTER_SET);
+ return properties;
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if ( !descriptor.isDynamic() ) {
+ return;
+ }
+
+ final Map<Relationship, HL7Query> updatedQueryMap = new HashMap<>(queries);
+ final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
+
+ if ( newValue == null ) {
+ updatedQueryMap.remove(relationship);
+ } else {
+ final HL7Query query = HL7Query.compile(newValue);
+ updatedQueryMap.put(relationship, query);
+ }
+
+ this.queries = updatedQueryMap;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>(queries.keySet());
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_ORIGINAL);
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue());
+
+ final byte[] buffer = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, buffer);
+ }
+ });
+
+ @SuppressWarnings("resource")
+ final HapiContext hapiContext = new DefaultHapiContext();
+ hapiContext.setValidationContext(ValidationContextFactory.noValidation());
+
+ final PipeParser parser = hapiContext.getPipeParser();
+ final String hl7Text = new String(buffer, charset);
+ final HL7Message message;
+ try {
+ final Message hapiMessage = parser.parse(hl7Text);
+ message = new HapiMessage(hapiMessage);
+ } catch (final Exception e) {
+ getLogger().error("Failed to parse {} as HL7 due to {}; routing to failure", new Object[] {flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final Set<String> matchingRels = new HashSet<>();
+ final Map<Relationship, HL7Query> queryMap = queries;
+ for ( final Map.Entry<Relationship, HL7Query> entry : queryMap.entrySet() ) {
+ final Relationship relationship = entry.getKey();
+ final HL7Query query = entry.getValue();
+
+ final QueryResult result = query.evaluate(message);
+ if ( result.isMatch() ) {
+ FlowFile clone = session.clone(flowFile);
+ clone = session.putAttribute(clone, "RouteHL7.Route", relationship.getName());
+ session.transfer(clone, relationship);
+ session.getProvenanceReporter().route(clone, relationship);
+ matchingRels.add(relationship.getName());
+ }
+ }
+
+ session.transfer(flowFile, REL_ORIGINAL);
+ getLogger().info("Routed a copy of {} to {} relationships: {}", new Object[] {flowFile, matchingRels.size(), matchingRels});
+ }
+
+ private static class HL7QueryValidator implements Validator {
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ String error = null;
+
+ try {
+ final HL7Query hl7Query = HL7Query.compile(input);
+ final List<Class<?>> returnTypes = hl7Query.getReturnTypes();
+ if ( returnTypes.size() != 1 ) {
+ error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE";
+ } else if ( !HL7Message.class.isAssignableFrom(returnTypes.get(0)) ) {
+ error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE";
+ }
+ } catch (final HL7QueryParsingException e) {
+ error = e.toString();
+ }
+
+ if ( error == null ) {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ } else {
+ return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(error).build();
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..3f57ff0
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.hl7.ExtractHL7Attributes
+org.apache.nifi.processors.hl7.RouteHL7
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java
new file mode 100644
index 0000000..f566288
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hl7;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.nifi.processors.hl7.ExtractHL7Attributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestExtractHL7Attributes {
+
+ @Test
+ public void testExtract() throws IOException {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
+ final TestRunner runner = TestRunners.newTestRunner(ExtractHL7Attributes.class);
+ runner.enqueue(Paths.get("src/test/resources/1.hl7"));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(ExtractHL7Attributes.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractHL7Attributes.REL_SUCCESS).get(0);
+ final SortedMap<String, String> sortedAttrs = new TreeMap<>(out.getAttributes());
+ for (final Map.Entry<String, String> entry : sortedAttrs.entrySet()) {
+ System.out.println(entry.getKey() + " : " + entry.getValue());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7 b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7
new file mode 100644
index 0000000..bf2b8a5
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7
@@ -0,0 +1,16 @@
+MSH|^~`&|ECG REPORTING|ROCHESTER|ERIS|ROCHESTER|20110621050440||ORU^R01|20110621050440|P|2.1
+PID|||999999999||TEST^PATIENT||18450101|F
+OBR|||211088491|0^ADULT^ROCHECG|||20110620170631|||||||||M999999^^^^^^^RACFID||||||20110621060232||EC|F|||||||M999999^LASTNAME MD^FIRSTNAME^^^^^RACFID
+OBX||ST|93000.2^VENTRICULAR RATE EKG/MIN^CPT4|1|52|/SEC
+OBX||ST|93000.4^PR INTERVAL(MSEC)^CPT4|2|208|MSEC
+OBX||ST|93000.5^QRS - INTERVAL(MSEC)^CPT4|3|88|MSEC
+OBX||ST|93000.6^QT - INTERVAL(MSEC)^CPT4|4|466|MSEC
+OBX||ST|93000&PTL^PHYSICAL TEST LOCATION^CPT4|5|STMA
+OBX||ST|93000&PTR^PHYSICAL TEST ROOM^CPT4|6|04254
+OBX||CE|93000.17^^CPT4|7|21&101^Sinus bradycardia`T`with 1st degree A-V block^MEIECG
+OBX||CE|93000.17^^CPT4|8|1687^Otherwise normal ECG^MEIECG
+OBX||CE|93000&CMP^^CPT4|9|1301^When compared with ECG of^MEIECG
+OBX||TS|93000&CMD^EKG COMPARISON DATE^CPT4|10|201106171659
+OBX||CE|93000&CMP^^CPT4|11|1305^No significant change was found^MEIECG
+OBX||TX|93000.48^EKG COMMENT^CPT4|12|9917^LASTNAME MD^FIRSTNAME
+OBX||FT|93000^ECG 12-LEAD^CPT4|13|{\rtf1\ansi \deff1\deflang1033\ {\fonttbl{\f1\fmodern\fcharset0 Courier;}{\f2\fmodern\fcharset0 Courier;}} \pard\plain \f1\fs18\par 20Jun2011 17:06\par VENTRICULAR RATE 52\par Sinus bradycardia with 1st degree A-V block\par Otherwise normal ECG\par When compared with ECG of 17-JUN-2011 16:59,\par No significant change was found\par 47507`S`'LASTNAME MD`S`'FIRSTNAME \par }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7 b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7
new file mode 100644
index 0000000..02e8967
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7
@@ -0,0 +1,5 @@
+MSH|^~\&|CERNER||PriorityHealth||||ORU^R01|Q479004375T431430612|P|2.3|
+PID|||001677980||SMITH^CURTIS||19680219|M||||||||||929645156318|123456789|
+PD1||||1234567890^LAST^FIRST^M^^^^^NPI|
+OBR|1|341856649^HNAM_ORDERID|000002006326002362|648088^Basic Metabolic Panel|||20061122151600|||||||||1620^Hooker^Robert^L||||||20061122154733|||F|||||||||||20061122140000|
+OBX|1|NM|GLU^Glucose Lvl|59|mg/dL|65-99^65^99|L|||F|||20061122154733|
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml
new file mode 100644
index 0000000..3f9fbce
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-bundles</artifactId>
+ <version>0.1.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-hl7-bundle</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>nifi-hl7-processors</module>
+ <module>nifi-hl7-nar</module>
+ </modules>
+
+</project>