You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:44:25 UTC

[50/62] [abbrv] incubator-nifi git commit: NIFI-506: Initial import of HL7 work

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
new file mode 100644
index 0000000..e4a0d53
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hl7;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import ca.uhn.hl7v2.DefaultHapiContext;
+import ca.uhn.hl7v2.HL7Exception;
+import ca.uhn.hl7v2.HapiContext;
+import ca.uhn.hl7v2.model.Composite;
+import ca.uhn.hl7v2.model.Group;
+import ca.uhn.hl7v2.model.Message;
+import ca.uhn.hl7v2.model.Primitive;
+import ca.uhn.hl7v2.model.Segment;
+import ca.uhn.hl7v2.model.Structure;
+import ca.uhn.hl7v2.model.Type;
+import ca.uhn.hl7v2.model.Varies;
+import ca.uhn.hl7v2.parser.PipeParser;
+import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
+
+
+@SideEffectFree
+@SupportsBatching
+@Tags({"HL7", "health level 7", "healthcare", "extract", "attributes"})
+@CapabilityDescription("Extracts information from an HL7 (Health Level 7) formatted FlowFile and adds the information as FlowFile Attributes. "
+		+ "The attributes are named as <Segment Name> <dot> <Field Index>. If the segment is repeating, the naming will be "
+		+ "<Segment Name> <underscore> <Segment Index> <dot> <Field Index>. For example, we may have an attribute named \"MHS.12\" with "
+		+ "a value of \"2.1\" and an attribute named \"OBX_11.3\" with a value of \"93000^CPT4\".")
+public class ExtractHL7Attributes extends AbstractProcessor {
+	public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+		.name("Character Encoding")
+		.description("The Character Encoding that is used to encode the HL7 data")
+		.required(true)
+		.expressionLanguageSupported(true)
+		.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+		.defaultValue("UTF-8")
+		.build();
+
+	public static final Relationship REL_SUCCESS = new Relationship.Builder()
+		.name("success")
+		.description("A FlowFile is routed to this relationship if it is properly parsed as HL7 and its attributes extracted")
+		.build();
+	
+	public static final Relationship REL_FAILURE = new Relationship.Builder()
+		.name("failure")
+		.description("A FlowFile is routed to this relationship if it cannot be mapped to FlowFile Attributes. This would happen if the FlowFile does not contain valid HL7 data")
+		.build();
+	
+	
+	@Override
+	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+		final List<PropertyDescriptor> properties = new ArrayList<>();
+		properties.add(CHARACTER_SET);
+		return properties;
+	}
+	
+	@Override
+	public Set<Relationship> getRelationships() {
+		final Set<Relationship> relationships = new HashSet<>();
+		relationships.add(REL_SUCCESS);
+		relationships.add(REL_FAILURE);
+		return relationships;
+	}
+	
+	@Override
+	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+		FlowFile flowFile = session.get();
+		if ( flowFile == null ) {
+			return;
+		}
+		
+		final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue());
+		
+		final byte[] buffer = new byte[(int) flowFile.getSize()];
+		session.read(flowFile, new InputStreamCallback() {
+			@Override
+			public void process(final InputStream in) throws IOException {
+				StreamUtils.fillBuffer(in, buffer);
+			}
+		});
+
+		@SuppressWarnings("resource")
+		final HapiContext hapiContext = new DefaultHapiContext();
+		hapiContext.setValidationContext(ValidationContextFactory.noValidation());
+		
+		final PipeParser parser = hapiContext.getPipeParser();
+		final String hl7Text = new String(buffer, charset);
+		final Message message;
+		try {
+			message = parser.parse(hl7Text);
+			final Group group = message.getParent();
+
+			final Map<String, String> attributes = new HashMap<>();
+			extractAttributes(group, attributes);
+			flowFile = session.putAllAttributes(flowFile, attributes);
+			getLogger().info("Successfully extracted {} attributes for {}; routing to success", new Object[] {attributes.size(), flowFile});
+			getLogger().debug("Added the following attributes for {}: {}", new Object[] {flowFile, attributes});
+			session.transfer(flowFile, REL_SUCCESS);
+		} catch (final HL7Exception e) {
+			getLogger().error("Failed to extract attributes from {} due to {}", new Object[] {flowFile, e});
+			session.transfer(flowFile, REL_FAILURE);
+			return;
+		}
+	}
+
+	private void extractAttributes(final Group group, final Map<String, String> attributes) throws HL7Exception {
+		extractAttributes(group, attributes, new HashMap<String, Integer>());
+	}
+	
+	private void extractAttributes(final Group group, final Map<String, String> attributes, final Map<String, Integer> segmentCounts) throws HL7Exception {
+		if ( group.isEmpty() ) {
+			return;
+		}
+		
+		final String[] structureNames = group.getNames();
+		for ( final String structName : structureNames ) {
+			final Structure[] subStructures = group.getAll(structName);
+
+			if ( group.isGroup(structName) ) {
+				for ( final Structure subStructure : subStructures ) {
+					final Group subGroup = (Group) subStructure;
+					extractAttributes(subGroup, attributes, segmentCounts);
+				}
+			} else {
+				for ( final Structure structure : subStructures ) {
+					final Segment segment = (Segment) structure	;
+					
+					final String segmentName = segment.getName();
+					Integer segmentNum = segmentCounts.get(segmentName);
+					if (segmentNum == null) {
+						segmentNum = 1;
+						segmentCounts.put(segmentName, 1);
+					} else {
+						segmentNum++;
+						segmentCounts.put(segmentName, segmentNum);
+					}
+					
+					final boolean segmentRepeating = segment.getParent().isRepeating(segment.getName());
+					final boolean parentRepeating = (segment.getParent().getParent() != segment.getParent() && segment.getParent().getParent().isRepeating(segment.getParent().getName()));
+					final boolean useSegmentIndex = segmentRepeating || parentRepeating;
+					
+					final Map<String, String> attributeMap = getAttributes(segment, useSegmentIndex ? segmentNum : null);
+					attributes.putAll(attributeMap);
+				}
+			}
+		}
+	}
+	
+	
+	private Map<String, String> getAttributes(final Segment segment, final Integer segmentNum) throws HL7Exception {
+		final Map<String, String> attributes = new HashMap<>();
+		
+		for (int i=1; i <= segment.numFields(); i++) {
+			final String fieldName = segment.getName() + (segmentNum == null ? "" : "_" + segmentNum) + "." + i;
+			final Type[] types = segment.getField(i);
+			final StringBuilder sb = new StringBuilder();
+			for ( final Type type : types ) {
+				final String typeValue = getValue(type);
+				if ( !typeValue.isEmpty() ) {
+					sb.append(typeValue).append("^");
+				}
+			}
+			
+			if ( sb.length() == 0 ) {
+				continue;
+			}
+			String typeVal = sb.toString();
+			if ( typeVal.endsWith("^") ) {
+				typeVal = typeVal.substring(0, typeVal.length() - 1);
+			}
+			
+			attributes.put(fieldName, typeVal);
+		}
+
+		return attributes;
+	}
+	
+	
+	private String getValue(final Type type) {
+		if ( type == null ) {
+			return "";
+		}
+		
+		if ( type instanceof Primitive ) {
+			final String value = ((Primitive) type).getValue();
+			return value == null ? "" : value;
+		} else if ( type instanceof Composite ) {
+			final StringBuilder sb = new StringBuilder();
+			final Composite composite = (Composite) type;
+			for ( final Type component : composite.getComponents() ) {
+				final String componentValue = getValue(component);
+				if ( !componentValue.isEmpty() ) {
+					sb.append(componentValue).append("^");
+				}
+			}
+			
+			final String value = sb.toString();
+			if ( value.endsWith("^") ) {
+				return value.substring(0, value.length() - 1);
+			}
+			
+			return value;
+		} else if ( type instanceof Varies ) {
+			final Varies varies = (Varies) type;
+			return getValue(varies.getData());
+		}
+		
+		return "";
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
new file mode 100644
index 0000000..c8c4176
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hl7;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hl7.hapi.HapiMessage;
+import org.apache.nifi.hl7.model.HL7Message;
+import org.apache.nifi.hl7.query.HL7Query;
+import org.apache.nifi.hl7.query.QueryResult;
+import org.apache.nifi.hl7.query.exception.HL7QueryParsingException;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import ca.uhn.hl7v2.DefaultHapiContext;
+import ca.uhn.hl7v2.HapiContext;
+import ca.uhn.hl7v2.model.Message;
+import ca.uhn.hl7v2.parser.PipeParser;
+import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"HL7", "healthcare", "route", "Health Level 7"})
+@DynamicProperties({@DynamicProperty(name="Name of a Relationship", value="An HL7 Query Language query", description="If a FlowFile matches the query, it will be routed to a relationship with the name of the property")})
+@WritesAttributes({@WritesAttribute(attribute="RouteHL7.Route", description="The name of the relationship to which the FlowFile was routed")})
+@CapabilityDescription("Routes incoming HL7 data according to user-defined queries. To add a query, add a new property to the processor."
+		+ " The name of the property will become a new relationship for the processor, and the value is an HL7 Query Language query. If"
+		+ " a FlowFile matches the query, a copy of the FlowFile will be routed to the associated relationship.")
+public class RouteHL7 extends AbstractProcessor {
+	public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
+		.name("Character Encoding")
+		.description("The Character Encoding that is used to encode the HL7 data")
+		.required(true)
+		.expressionLanguageSupported(true)
+		.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+		.defaultValue("UTF-8")
+		.build();
+
+	static final Relationship REL_FAILURE = new Relationship.Builder()
+		.name("failure")
+		.description("Any FlowFile that cannot be parsed as HL7 will be routed to this relationship")
+		.build();
+	static final Relationship REL_ORIGINAL = new Relationship.Builder()
+		.name("original")
+		.description("The original FlowFile that comes into this processor will be routed to this relationship, unless it is routed to 'failure'")
+		.build();
+	
+	private volatile Map<Relationship, HL7Query> queries = new HashMap<>();
+	
+	@Override
+	protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+		return new PropertyDescriptor.Builder()
+			.name(propertyDescriptorName)
+			.description("Specifies a query that will cause any HL7 message matching the query to be routed to the '" + propertyDescriptorName + "' relationship")
+			.required(false)
+			.dynamic(true)
+			.addValidator(new HL7QueryValidator())
+			.build();
+	}
+
+	@Override
+	protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+		final List<PropertyDescriptor> properties = new ArrayList<>();
+		properties.add(CHARACTER_SET);
+		return properties;
+	}
+	
+	@Override
+	public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+		if ( !descriptor.isDynamic() ) {
+			return;
+		}
+		
+		final Map<Relationship, HL7Query> updatedQueryMap = new HashMap<>(queries);
+		final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build();
+		
+		if ( newValue == null ) {
+			updatedQueryMap.remove(relationship);
+		} else {
+			final HL7Query query = HL7Query.compile(newValue);
+			updatedQueryMap.put(relationship, query);
+		}
+		
+		this.queries = updatedQueryMap;
+	}
+	
+	@Override
+	public Set<Relationship> getRelationships() {
+		final Set<Relationship> relationships = new HashSet<>(queries.keySet());
+		relationships.add(REL_FAILURE);
+		relationships.add(REL_ORIGINAL);
+		return relationships;
+	}
+	
+	@Override
+	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+		FlowFile flowFile = session.get();
+		if ( flowFile == null ) {
+			return;
+		}
+		
+		final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue());
+		
+		final byte[] buffer = new byte[(int) flowFile.getSize()];
+		session.read(flowFile, new InputStreamCallback() {
+			@Override
+			public void process(final InputStream in) throws IOException {
+				StreamUtils.fillBuffer(in, buffer);
+			}
+		});
+
+		@SuppressWarnings("resource")
+		final HapiContext hapiContext = new DefaultHapiContext();
+		hapiContext.setValidationContext(ValidationContextFactory.noValidation());
+		
+		final PipeParser parser = hapiContext.getPipeParser();
+		final String hl7Text = new String(buffer, charset);
+		final HL7Message message;
+		try {
+			final Message hapiMessage = parser.parse(hl7Text);
+			message = new HapiMessage(hapiMessage);
+		} catch (final Exception e) {
+			getLogger().error("Failed to parse {} as HL7 due to {}; routing to failure", new Object[] {flowFile, e});
+			session.transfer(flowFile, REL_FAILURE);
+			return;
+		}
+
+		final Set<String> matchingRels = new HashSet<>();
+		final Map<Relationship, HL7Query> queryMap = queries;
+		for ( final Map.Entry<Relationship, HL7Query> entry : queryMap.entrySet() ) {
+			final Relationship relationship = entry.getKey();
+			final HL7Query query = entry.getValue();
+			
+			final QueryResult result = query.evaluate(message);
+			if ( result.isMatch() ) {
+				FlowFile clone = session.clone(flowFile);
+				clone = session.putAttribute(clone, "RouteHL7.Route", relationship.getName());
+				session.transfer(clone, relationship);
+				session.getProvenanceReporter().route(clone, relationship);
+				matchingRels.add(relationship.getName());
+			}
+		}
+
+		session.transfer(flowFile, REL_ORIGINAL);
+		getLogger().info("Routed a copy of {} to {} relationships: {}", new Object[] {flowFile, matchingRels.size(), matchingRels});
+	}
+
+	private static class HL7QueryValidator implements Validator {
+
+		@Override
+		public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+			String error = null;
+			
+			try {
+				final HL7Query hl7Query = HL7Query.compile(input);
+				final List<Class<?>> returnTypes = hl7Query.getReturnTypes();
+				if ( returnTypes.size() != 1 ) {
+					error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE";
+				} else if ( !HL7Message.class.isAssignableFrom(returnTypes.get(0)) ) {
+					error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE";
+				}
+			} catch (final HL7QueryParsingException e) {
+				error = e.toString();
+			}
+			
+			if ( error == null ) {
+				return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+			} else {
+				return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(error).build();
+			}
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..3f57ff0
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.hl7.ExtractHL7Attributes
+org.apache.nifi.processors.hl7.RouteHL7
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java
new file mode 100644
index 0000000..f566288
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hl7;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.nifi.processors.hl7.ExtractHL7Attributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestExtractHL7Attributes {
+
+	@Test
+	public void testExtract() throws IOException {
+		System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG");
+		final TestRunner runner = TestRunners.newTestRunner(ExtractHL7Attributes.class);
+		runner.enqueue(Paths.get("src/test/resources/1.hl7"));
+		
+		runner.run();
+		runner.assertAllFlowFilesTransferred(ExtractHL7Attributes.REL_SUCCESS, 1);
+		final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractHL7Attributes.REL_SUCCESS).get(0);
+		final SortedMap<String, String> sortedAttrs = new TreeMap<>(out.getAttributes());
+		for (final Map.Entry<String, String> entry : sortedAttrs.entrySet()) {
+			System.out.println(entry.getKey() + " : " + entry.getValue());
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7 b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7
new file mode 100644
index 0000000..bf2b8a5
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/1.hl7
@@ -0,0 +1,16 @@
+MSH|^~`&|ECG REPORTING|ROCHESTER|ERIS|ROCHESTER|20110621050440||ORU^R01|20110621050440|P|2.1
+PID|||999999999||TEST^PATIENT||18450101|F
+OBR|||211088491|0^ADULT^ROCHECG|||20110620170631|||||||||M999999^^^^^^^RACFID||||||20110621060232||EC|F|||||||M999999^LASTNAME MD^FIRSTNAME^^^^^RACFID
+OBX||ST|93000.2^VENTRICULAR RATE EKG/MIN^CPT4|1|52|/SEC
+OBX||ST|93000.4^PR INTERVAL(MSEC)^CPT4|2|208|MSEC
+OBX||ST|93000.5^QRS - INTERVAL(MSEC)^CPT4|3|88|MSEC
+OBX||ST|93000.6^QT - INTERVAL(MSEC)^CPT4|4|466|MSEC
+OBX||ST|93000&PTL^PHYSICAL TEST LOCATION^CPT4|5|STMA
+OBX||ST|93000&PTR^PHYSICAL TEST ROOM^CPT4|6|04254
+OBX||CE|93000.17^^CPT4|7|21&101^Sinus bradycardia`T`with 1st degree A-V block^MEIECG
+OBX||CE|93000.17^^CPT4|8|1687^Otherwise normal ECG^MEIECG
+OBX||CE|93000&CMP^^CPT4|9|1301^When compared with ECG of^MEIECG
+OBX||TS|93000&CMD^EKG COMPARISON DATE^CPT4|10|201106171659
+OBX||CE|93000&CMP^^CPT4|11|1305^No significant change was found^MEIECG
+OBX||TX|93000.48^EKG COMMENT^CPT4|12|9917^LASTNAME MD^FIRSTNAME
+OBX||FT|93000^ECG 12-LEAD^CPT4|13|{\rtf1\ansi \deff1\deflang1033\ {\fonttbl{\f1\fmodern\fcharset0 Courier;}{\f2\fmodern\fcharset0 Courier;}} \pard\plain \f1\fs18\par 20Jun2011 17:06\par VENTRICULAR RATE 52\par Sinus bradycardia with 1st degree A-V block\par Otherwise normal ECG\par When compared with ECG of 17-JUN-2011 16:59,\par No significant change was found\par 47507`S`'LASTNAME MD`S`'FIRSTNAME \par }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7 b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7
new file mode 100644
index 0000000..02e8967
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/resources/hypoglycemia.hl7
@@ -0,0 +1,5 @@
+MSH|^~\&|CERNER||PriorityHealth||||ORU^R01|Q479004375T431430612|P|2.3|
+PID|||001677980||SMITH^CURTIS||19680219|M||||||||||929645156318|123456789|
+PD1||||1234567890^LAST^FIRST^M^^^^^NPI|
+OBR|1|341856649^HNAM_ORDERID|000002006326002362|648088^Basic Metabolic Panel|||20061122151600|||||||||1620^Hooker^Robert^L||||||20061122154733|||F|||||||||||20061122140000|
+OBX|1|NM|GLU^Glucose Lvl|59|mg/dL|65-99^65^99|L|||F|||20061122154733|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/45416dc6/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml
new file mode 100644
index 0000000..3f9fbce
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.1.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-hl7-bundle</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-hl7-processors</module>
+        <module>nifi-hl7-nar</module>
+    </modules>
+
+</project>