You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/19 06:10:06 UTC
[4/6] nifi git commit: NIFI-3863: Initial implementation of Lookup
Services. Implemented LookupRecord processors. This required some refactoring
of RecordSetWriter interface,
so refactored that interface and all implementations and references of it
NIFI-3863: Initial implementation of Lookup Services. Implemented LookupRecord processors. This required some refactoring of RecordSetWriter interface, so refactored that interface and all implementations and references of it
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9bd0246a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9bd0246a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9bd0246a
Branch: refs/heads/master
Commit: 9bd0246a96cfcda01b972834823250a9cddf77a8
Parents: 3b98abb
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 10 13:09:45 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri May 19 01:02:41 2017 -0400
----------------------------------------------------------------------
.../controller/AbstractControllerService.java | 28 ++
nifi-assembly/pom.xml | 5 +
.../nifi/record/path/StandardFieldValue.java | 7 +-
.../serialization/AbstractRecordSetWriter.java | 105 ++++++
.../nifi/serialization/RecordSetWriter.java | 21 +-
.../apache/nifi/serialization/RecordWriter.java | 7 +-
.../hadoop/AbstractFetchHDFSRecord.java | 11 +-
.../serialization/record/MockRecordParser.java | 5 +-
.../serialization/record/MockRecordWriter.java | 66 ++--
.../repository/StandardProcessSession.java | 12 +-
.../processors/kafka/pubsub/ConsumerLease.java | 32 +-
.../kafka/pubsub/PublishKafkaRecord_0_10.java | 14 +-
.../processors/kafka/pubsub/PublisherLease.java | 14 +-
.../pubsub/TestPublishKafkaRecord_0_10.java | 19 +-
.../kafka/pubsub/util/MockRecordWriter.java | 19 +-
.../processors/parquet/FetchParquetTest.java | 4 +-
.../services/AvroSchemaValidator.java | 5 +-
.../record/script/ScriptedRecordSetWriter.java | 9 +-
.../script/ScriptedRecordSetWriterTest.groovy | 7 +-
.../groovy/test_record_writer_inline.groovy | 35 +-
.../nifi-standard-processors/pom.xml | 4 +
.../standard/AbstractRecordProcessor.java | 33 +-
.../standard/AbstractRouteRecord.java | 223 +++++++++++++
.../nifi/processors/standard/LookupRecord.java | 208 ++++++++++++
.../nifi/processors/standard/QueryRecord.java | 29 +-
.../nifi/processors/standard/SplitRecord.java | 31 +-
.../org.apache.nifi.processor.Processor | 1 +
.../processors/standard/TestLookupRecord.java | 232 +++++++++++++
.../processors/standard/TestQueryRecord.java | 20 +-
.../nifi-lookup-service-api/pom.xml | 36 ++
.../nifi/lookup/LookupFailureException.java | 37 +++
.../org/apache/nifi/lookup/LookupService.java | 41 +++
.../apache/nifi/lookup/RecordLookupService.java | 41 +++
.../apache/nifi/lookup/StringLookupService.java | 39 +++
.../nifi-lookup-services-nar/pom.xml | 35 ++
.../src/main/resources/META-INF/NOTICE | 73 ++++
.../nifi-lookup-services/pom.xml | 51 +++
.../lookup/SimpleKeyValueLookupService.java | 58 ++++
.../nifi/lookup/maxmind/AnonymousIpSchema.java | 35 ++
.../apache/nifi/lookup/maxmind/CitySchema.java | 55 +++
.../nifi/lookup/maxmind/ContainerSchema.java | 37 +++
.../nifi/lookup/maxmind/DatabaseReader.java | 252 ++++++++++++++
.../nifi/lookup/maxmind/IPLookupService.java | 332 +++++++++++++++++++
.../apache/nifi/lookup/maxmind/IspSchema.java | 34 ++
...org.apache.nifi.controller.ControllerService | 17 +
.../additionalDetails.html | 102 ++++++
.../nifi-lookup-services-bundle/pom.xml | 28 ++
.../serialization/RecordSetWriterFactory.java | 5 +-
.../apache/nifi/avro/AvroRecordSetWriter.java | 8 +-
.../org/apache/nifi/avro/WriteAvroResult.java | 14 +-
.../avro/WriteAvroResultWithExternalSchema.java | 77 ++---
.../nifi/avro/WriteAvroResultWithSchema.java | 63 ++--
.../org/apache/nifi/csv/CSVRecordSetWriter.java | 6 +-
.../org/apache/nifi/csv/WriteCSVResult.java | 89 ++---
.../apache/nifi/json/JsonRecordSetWriter.java | 6 +-
.../org/apache/nifi/json/WriteJsonResult.java | 75 ++---
.../nifi/text/FreeFormTextRecordSetWriter.java | 5 +-
.../apache/nifi/text/FreeFormTextWriter.java | 36 +-
.../apache/nifi/avro/TestWriteAvroResult.java | 27 +-
.../avro/TestWriteAvroResultWithSchema.java | 6 +-
.../avro/TestWriteAvroResultWithoutSchema.java | 6 +-
.../org/apache/nifi/csv/TestWriteCSVResult.java | 57 ++--
.../apache/nifi/json/TestWriteJsonResult.java | 37 +--
.../nifi-standard-services-api-nar/pom.xml | 5 +
nifi-nar-bundles/nifi-standard-services/pom.xml | 2 +
pom.xml | 16 +
66 files changed, 2619 insertions(+), 430 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
index 6a27761..9762f3e 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
@@ -16,7 +16,11 @@
*/
package org.apache.nifi.controller;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessorInitializationContext;
@@ -28,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
private ControllerServiceLookup serviceLookup;
private ComponentLog logger;
private StateManager stateManager;
+ private volatile ConfigurationContext configurationContext;
@Override
public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
@@ -75,4 +80,27 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
protected StateManager getStateManager() {
return stateManager;
}
+
+ @OnEnabled
+ public final void abstractStoreConfigContext(final ConfigurationContext configContext) {
+ this.configurationContext = configContext;
+ }
+
+ @OnDisabled
+ public final void abstractClearConfigContext() {
+ this.configurationContext = null;
+ }
+
+ protected ConfigurationContext getConfigurationContext() {
+ final ConfigurationContext context = this.configurationContext;
+ if (context == null) {
+ throw new IllegalStateException("No Configuration Context exists");
+ }
+
+ return configurationContext;
+ }
+
+ protected PropertyValue getProperty(final PropertyDescriptor descriptor) {
+ return getConfigurationContext().getProperty(descriptor);
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 18cc689..5c7bff7 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -198,6 +198,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-lookup-services-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-poi-nar</artifactId>
<type>nar</type>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
index 4447fed..b02deb4 100644
--- a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
+++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/StandardFieldValue.java
@@ -103,7 +103,12 @@ public class StandardFieldValue implements FieldValue {
public void updateValue(final Object newValue) {
final Optional<Record> parentRecord = getParentRecord();
if (!parentRecord.isPresent()) {
- throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record");
+ if (value instanceof Record) {
+ ((Record) value).setValue(getField().getFieldName(), newValue);
+ return;
+ } else {
+ throw new UnsupportedOperationException("Cannot update the field value because the value is not associated with any record");
+ }
}
parentRecord.get().setValue(getField().getFieldName(), newValue);
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
new file mode 100644
index 0000000..5feb264
--- /dev/null
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public abstract class AbstractRecordSetWriter implements RecordSetWriter {
+ private final OutputStream out;
+ private int recordCount = 0;
+ private boolean activeRecordSet = false;
+
+ public AbstractRecordSetWriter(final OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.out.close();
+ }
+
+ @Override
+ public WriteResult write(final RecordSet recordSet) throws IOException {
+ beginRecordSet();
+ Record record;
+ while ((record = recordSet.next()) != null) {
+ write(record);
+ recordCount++;
+ }
+ return finishRecordSet();
+ }
+
+ protected OutputStream getOutputStream() {
+ return out;
+ }
+
+ protected final int getRecordCount() {
+ return recordCount;
+ }
+
+ protected final boolean isRecordSetActive() {
+ return activeRecordSet;
+ }
+
+ @Override
+ public final void beginRecordSet() throws IOException {
+ if (activeRecordSet) {
+ throw new IllegalStateException("Cannot begin a RecordSet because a RecordSet has already begun");
+ }
+
+ activeRecordSet = true;
+ onBeginRecordSet();
+ }
+
+ @Override
+ public final WriteResult finishRecordSet() throws IOException {
+ if (!isRecordSetActive()) {
+ throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun");
+ }
+
+ final Map<String, String> attributes = onFinishRecordSet();
+ return WriteResult.of(recordCount, attributes == null ? Collections.emptyMap() : attributes);
+ }
+
+ /**
+ * Method that is called as a result of {@link #beginRecordSet()} being called. This gives subclasses
+ * the chance to react to a new RecordSet beginning but prevents the subclass from changing how this
+ * implementation maintains its internal state. By default, this method does nothing.
+ *
+ * @throws IOException if unable to write the necessary data for a new RecordSet
+ */
+ protected void onBeginRecordSet() throws IOException {
+ }
+
+ /**
+ * Method that is called by {@link #finishRecordSet()} when a RecordSet is finished. This gives subclasses
+ * the chance to react to a RecordSet being completed but prevents the subclass from changing how this
+ * implementation maintains its internal state.
+ *
+ * @return a Map of key/value pairs that should be added to the FlowFile as attributes
+ */
+ protected Map<String, String> onFinishRecordSet() throws IOException {
+ return Collections.emptyMap();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
index 7d6fa1c..7c29cfe 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordSetWriter.java
@@ -37,9 +37,26 @@ public interface RecordSetWriter extends RecordWriter {
* Writes the given result set to the given output stream
*
* @param recordSet the record set to serialize
- * @param out the OutputStream to write to
+ *
+ * @return the results of writing the data
+ * @throws IOException if unable to write to the given OutputStream
+ */
+ WriteResult write(RecordSet recordSet) throws IOException;
+
+ /**
+ * Begins a new RecordSet
+ *
+ * @throws IOException if unable to write to the underlying OutputStream
+ * @throws IllegalStateException if a RecordSet has already been started
+ */
+ void beginRecordSet() throws IOException;
+
+ /**
+ * Finishes the currently active RecordSet and returns a WriteResult that includes information about what was written
+ *
* @return the results of writing the data
* @throws IOException if unable to write to the given OutputStream
+ * @throws IllegalStateException if a RecordSet has not been started via {@link #beginRecordSet()}
*/
- WriteResult write(RecordSet recordSet, OutputStream out) throws IOException;
+ WriteResult finishRecordSet() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index aa298d9..720953c 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -17,21 +17,20 @@
package org.apache.nifi.serialization;
+import java.io.Closeable;
import java.io.IOException;
-import java.io.OutputStream;
import org.apache.nifi.serialization.record.Record;
-public interface RecordWriter {
+public interface RecordWriter extends Closeable {
/**
* Writes the given result set to the given output stream
*
* @param record the record set to serialize
- * @param out the OutputStream to write to
* @return the results of writing the data
* @throws IOException if unable to write to the given OutputStream
*/
- WriteResult write(Record record, OutputStream out) throws IOException;
+ WriteResult write(Record record) throws IOException;
/**
* @return the MIME Type that the Result Set Writer produces. This will be added to FlowFiles using
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 8883965..7cc6bb5 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -188,12 +188,14 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0));
- final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema);
final StopWatch stopWatch = new StopWatch(true);
// use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure
child = session.create(originalFlowFile);
+
+ final FlowFile writableFlowFile = child;
+ final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
child = session.write(child, (final OutputStream rawOut) -> {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
@@ -212,7 +214,10 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
}
};
- writeResult.set(recordSetWriter.write(recordSet, out));
+ try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
+ writeResult.set(recordSetWriter.write(recordSet));
+ mimeTypeRef.set(recordSetWriter.getMimeType());
+ }
} catch (Exception e) {
exceptionHolder.set(e);
}
@@ -230,7 +235,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final Map<String,String> attributes = new HashMap<>(writeResult.get().getAttributes());
attributes.put(RECORD_COUNT_ATTR, String.valueOf(writeResult.get().getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(), recordSetWriter.getMimeType());
+ attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
successFlowFile = session.putAllAttributes(successFlowFile, attributes);
final URI uri = path.toUri();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
index e3ed23e..251eb46 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
@@ -42,7 +42,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
- private final int failAfterN;
+ private int failAfterN;
public MockRecordParser() {
this(-1);
@@ -52,6 +52,9 @@ public class MockRecordParser extends AbstractControllerService implements Recor
this.failAfterN = failAfterN;
}
+ public void failAfter(final int failAfterN) {
+ this.failAfterN = failAfterN;
+ }
public void addSchemaField(final String fieldName, final RecordFieldType type) {
fields.add(new RecordField(fieldName, type.getDataType()));
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index b4253ee..525a51f 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -36,6 +36,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
private final int failAfterN;
private final boolean quoteValues;
+ public MockRecordWriter() {
+ this(null);
+ }
+
public MockRecordWriter(final String header) {
this(header, true, -1);
}
@@ -56,12 +60,16 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
+ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new RecordSetWriter() {
+ private int recordCount = 0;
+
@Override
- public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
- out.write(header.getBytes());
- out.write("\n".getBytes());
+ public WriteResult write(final RecordSet rs) throws IOException {
+ if (header != null) {
+ out.write(header.getBytes());
+ out.write("\n".getBytes());
+ }
int recordCount = 0;
Record record = null;
@@ -75,14 +83,14 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
int i = 0;
for (final String fieldName : record.getSchema().getFieldNames()) {
final String val = record.getAsString(fieldName);
- if (quoteValues) {
- out.write("\"".getBytes());
- if (val != null) {
+ if (val != null) {
+ if (quoteValues) {
+ out.write("\"".getBytes());
+ out.write(val.getBytes());
+ out.write("\"".getBytes());
+ } else {
out.write(val.getBytes());
}
- out.write("\"".getBytes());
- } else if (val != null) {
- out.write(val.getBytes());
}
if (i++ < numCols - 1) {
@@ -101,20 +109,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public WriteResult write(Record record, OutputStream out) throws IOException {
- out.write(header.getBytes());
- out.write("\n".getBytes());
+ public WriteResult write(Record record) throws IOException {
+ if (header != null) {
+ out.write(header.getBytes());
+ out.write("\n".getBytes());
+ }
final int numCols = record.getSchema().getFieldCount();
int i = 0;
for (final String fieldName : record.getSchema().getFieldNames()) {
final String val = record.getAsString(fieldName);
- if (quoteValues) {
- out.write("\"".getBytes());
- out.write(val.getBytes());
- out.write("\"".getBytes());
- } else {
- out.write(val.getBytes());
+ if (val != null) {
+ if (quoteValues) {
+ out.write("\"".getBytes());
+ out.write(val.getBytes());
+ out.write("\"".getBytes());
+ } else {
+ out.write(val.getBytes());
+ }
}
if (i++ < numCols - 1) {
@@ -123,8 +135,24 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
out.write("\n".getBytes());
+ recordCount++;
+
return WriteResult.of(1, Collections.emptyMap());
}
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public void beginRecordSet() throws IOException {
+ }
+
+ @Override
+ public WriteResult finishRecordSet() throws IOException {
+ return WriteResult.of(recordCount, Collections.emptyMap());
+ }
};
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index b307930..62326a2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2460,19 +2460,21 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
writeRecursionSet.remove(sourceFlowFile);
final long bytesWritten = countingOut.getBytesWritten();
- if (!closed) {
- StandardProcessSession.this.bytesWritten += bytesWritten;
- closed = true;
- }
+ StandardProcessSession.this.bytesWritten += bytesWritten;
openOutputStreams.remove(sourceFlowFile);
+ flush();
removeTemporaryClaim(record);
- flush();
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
.fromFlowFile(record.getCurrent())
.contentClaim(updatedClaim)
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 1ccce07..563ece6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -413,16 +413,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
FlowFile flowFile = session.create();
try {
- final RecordSetWriter writer;
+ final RecordSchema schema;
+
try {
- final RecordSchema schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
- writer = writerFactory.createWriter(logger, schema);
+ schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
} catch (final Exception e) {
- logger.error(
- "Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "
- + "Record Writer cannot obtain the appropriate Schema, due to failure to connect to a remote Schema Registry "
- + "or due to the Schema Access Strategy being dependent upon FlowFile Attributes that are not available. "
- + "Will roll back the Kafka message offsets.", e);
+ logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
try {
rollback(topicPartition);
@@ -436,6 +432,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final FlowFile ff = flowFile;
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
+ final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
flowFile = session.write(flowFile, rawOut -> {
final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
@@ -479,15 +476,28 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
};
- try (final OutputStream out = new BufferedOutputStream(rawOut)) {
- writeResult.set(writer.write(recordSet, out));
+ try (final OutputStream out = new BufferedOutputStream(rawOut);
+ final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) {
+ writeResult.set(writer.write(recordSet));
+ mimeTypeRef.set(writer.getMimeType());
+ } catch (final Exception e) {
+ logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e);
+
+ try {
+ rollback(topicPartition);
+ } catch (final Exception rollbackException) {
+ logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+ }
+
+ yield();
+ throw new ProcessException(e);
}
});
final WriteResult result = writeResult.get();
if (result.getRecordCount() > 0) {
final Map<String, String> attributes = new HashMap<>(result.getAttributes());
- attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
attributes.put("record.count", String.valueOf(result.getRecordCount()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
index 442ccc5..e48568b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
@@ -58,7 +58,6 @@ import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RecordWriter;
import org.apache.nifi.serialization.record.RecordSchema;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
@@ -324,14 +323,13 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
- final RecordWriter writer;
- try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final RecordSchema schema = writerFactory.getSchema(flowFile, in);
+ final RecordSchema schema;
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- writer = writerFactory.createWriter(getLogger(), schema);
+ try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
+ schema = writerFactory.getSchema(flowFile, in);
} catch (final Exception e) {
- getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e});
+ getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
continue;
}
@@ -342,7 +340,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger());
- lease.publish(flowFile, reader, writer, messageKeyField, topic);
+ lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index be2697b..4b3a3ae 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -32,9 +32,12 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
@@ -93,7 +96,8 @@ public class PublisherLease implements Closeable {
}
}
- void publish(final FlowFile flowFile, final RecordReader reader, final RecordWriter writer, final String messageKeyField, final String topic) throws IOException {
+ void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
+ final String messageKeyField, final String topic) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker();
}
@@ -104,11 +108,11 @@ public class PublisherLease implements Closeable {
final RecordSet recordSet = reader.createRecordSet();
int recordCount = 0;
- try {
+ try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) {
while ((record = recordSet.next()) != null) {
recordCount++;
baos.reset();
- writer.write(record, baos);
+ writer.write(record);
final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
@@ -127,6 +131,8 @@ public class PublisherLease implements Closeable {
}
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
+ } catch (final SchemaNotFoundException snfe) {
+ throw new IOException(snfe);
} catch (final Exception e) {
tracker.fail(flowFile, e);
poison();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
index 8c6efb7..7cff2a7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
@@ -44,8 +44,8 @@ import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -65,7 +65,8 @@ public class TestPublishKafkaRecord_0_10 {
public void setup() throws InitializationException, IOException {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
- Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), any(String.class), any(String.class));
+ Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class),
+ any(RecordSchema.class), any(String.class), any(String.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
@@ -103,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -122,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -137,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -154,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -176,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
- verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
@@ -206,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -240,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
- verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index a1abda4..27df57b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -58,10 +58,10 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
+ public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new RecordSetWriter() {
@Override
- public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException {
+ public WriteResult write(final RecordSet rs) throws IOException {
out.write(header.getBytes());
out.write("\n".getBytes());
@@ -102,7 +102,20 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public WriteResult write(Record record, OutputStream out) throws IOException {
+ public WriteResult write(Record record) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void beginRecordSet() throws IOException {
+ }
+
+ @Override
+ public WriteResult finishRecordSet() throws IOException {
return null;
}
};
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index 5b4819a..6ecfa59 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -217,11 +217,11 @@ public class FetchParquetTest {
configure(proc);
final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
- when(recordSetWriter.write(any(RecordSet.class), any(OutputStream.class))).thenThrow(new IOException("IOException"));
+ when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException"));
final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
- when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class))).thenReturn(recordSetWriter);
+ when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(FlowFile.class), any(OutputStream.class))).thenReturn(recordSetWriter);
testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory);
testRunner.enableControllerService(recordSetWriterFactory);
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
index 32b700f..904aab6 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaValidator.java
@@ -18,9 +18,11 @@
package org.apache.nifi.schemaregistry.services;
import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
public class AvroSchemaValidator implements Validator {
@@ -36,7 +38,8 @@ public class AvroSchemaValidator implements Validator {
}
try {
- new Schema.Parser().parse(input);
+ final Schema avroSchema = new Schema.Parser().parse(input);
+ AvroTypeUtil.createSchema(avroSchema, input, SchemaIdentifier.EMPTY);
return new ValidationResult.Builder()
.input(input)
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
index 1dde047..b18e9de 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -34,6 +34,7 @@ import javax.script.Invocable;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
@@ -52,15 +53,12 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
super.onEnabled(context);
}
- public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException {
- return createWriter(logger, getSchema(flowFile, in));
- }
@Override
- public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException {
+ public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
if (recordFactory.get() != null) {
try {
- return recordFactory.get().createWriter(logger, schema);
+ return recordFactory.get().createWriter(logger, schema, flowFile, out);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}
@@ -131,6 +129,7 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
}
} catch (final Exception ex) {
+ ex.printStackTrace();
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + ex.getLocalizedMessage();
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
index 2e1c03d..96fda19 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy
@@ -104,7 +104,9 @@ class ScriptedRecordSetWriterTest {
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
def schema = recordSetWriterFactory.getSchema(mockFlowFile, inStream)
- RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema)
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
+ RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream)
assertNotNull(recordSetWriter)
def recordSchema = new SimpleRecordSchema(
@@ -119,8 +121,7 @@ class ScriptedRecordSetWriterTest {
new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
] as MapRecord[]
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
- recordSetWriter.write(RecordSet.of(recordSchema, records), outputStream)
+ recordSetWriter.write(RecordSet.of(recordSchema, records))
def xml = new XmlSlurper().parseText(outputStream.toString())
assertEquals('1', xml.record[0].id.toString())
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
index 4fae4fe..b0daaca 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
@@ -34,9 +34,15 @@ import org.apache.nifi.stream.io.NonCloseableOutputStream
class GroovyRecordSetWriter implements RecordSetWriter {
-
+ private int recordCount = 0;
+ private final OutputStream out;
+
+ public GroovyRecordSetWriter(final OutputStream out) {
+ this.out = out;
+ }
+
@Override
- WriteResult write(Record r, OutputStream out) throws IOException {
+ WriteResult write(Record r) throws IOException {
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
@@ -44,7 +50,9 @@ class GroovyRecordSetWriter implements RecordSetWriter {
}
}
}
- WriteResult.of(0, [:])
+
+ recordCount++;
+ WriteResult.of(1, [:])
}
@Override
@@ -53,10 +61,10 @@ class GroovyRecordSetWriter implements RecordSetWriter {
}
@Override
- WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
+ WriteResult write(final RecordSet rs) throws IOException {
int count = 0
- new OutputStreamWriter(new NonCloseableOutputStream(rawOut)).with {osw ->
+ new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
@@ -73,6 +81,18 @@ class GroovyRecordSetWriter implements RecordSetWriter {
}
WriteResult.of(count, [:])
}
+
+ public void beginRecordSet() throws IOException {
+ }
+
+ @Override
+ public WriteResult finishRecordSet() throws IOException {
+ return WriteResult.of(recordCount, [:]);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@@ -83,9 +103,10 @@ class GroovyRecordSetWriterFactory extends AbstractControllerService implements
}
@Override
- RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException {
- return new GroovyRecordSetWriter()
+ RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException {
+ return new GroovyRecordSetWriter(out)
}
+
}
writer = new GroovyRecordSetWriterFactory()
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index a86c836..a0932e0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -42,6 +42,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-lookup-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index b6cc83b..52fcbb8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -27,7 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@@ -101,26 +101,27 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final RecordSetWriter writer;
final RecordSchema writeSchema;
try (final InputStream rawIn = session.read(flowFile);
final InputStream in = new BufferedInputStream(rawIn)) {
writeSchema = writerFactory.getSchema(flowFile, in);
- writer = writerFactory.createWriter(getLogger(), writeSchema);
} catch (final Exception e) {
- getLogger().error("Failed to convert records for {}; will route to failure", new Object[] {flowFile, e});
+ getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
- final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>();
+ final Map<String, String> attributes = new HashMap<>();
+ final AtomicInteger recordCount = new AtomicInteger();
final FlowFile original = flowFile;
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
+
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
+ final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out);
final RecordSet recordSet = new RecordSet() {
@Override
@@ -151,8 +152,11 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
}
};
- final WriteResult writeResult = writer.write(recordSet, out);
- writeResultRef.set(writeResult);
+ final WriteResult writeResult = writer.write(recordSet);
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCount.set(writeResult.getRecordCount());
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
@@ -160,22 +164,17 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
}
});
} catch (final Exception e) {
- getLogger().error("Failed to convert {}", new Object[] {flowFile, e});
+ getLogger().error("Failed to process {}", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
- final WriteResult writeResult = writeResultRef.get();
-
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
-
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
- session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
- getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile});
+
+ final int count = recordCount.get();
+ session.adjustCounter("Records Processed", count, false);
+ getLogger().info("Successfully converted {} records for {}", new Object[] {count, flowFile});
}
protected abstract Record process(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context);
http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
new file mode 100644
index 0000000..955023f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+
+public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for reading incoming data")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+ static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship")
+ .build();
+ static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("Once a FlowFile has been processed and any derivative FlowFiles have been transferred, the original FlowFile will be transferred to this relationship.")
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_READER);
+ properties.add(RECORD_WRITER);
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ if (isRouteOriginal()) {
+ relationships.add(REL_ORIGINAL);
+ }
+
+ relationships.add(REL_FAILURE);
+ return relationships;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final T flowFileContext = getFlowFileContext(flowFile, context);
+
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final RecordSchema writeSchema;
+ try (final InputStream rawIn = session.read(flowFile);
+ final InputStream in = new BufferedInputStream(rawIn)) {
+ writeSchema = writerFactory.getSchema(flowFile, in);
+ } catch (final Exception e) {
+ getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final AtomicInteger numRecords = new AtomicInteger(0);
+ final Map<Relationship, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
+ final FlowFile original = flowFile;
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
+
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ final Set<Relationship> relationships = route(record, writeSchema, original, context, flowFileContext);
+ numRecords.incrementAndGet();
+
+ for (final Relationship relationship : relationships) {
+ final RecordSetWriter recordSetWriter;
+ Tuple<FlowFile, RecordSetWriter> tuple = writers.get(relationship);
+ if (tuple == null) {
+ FlowFile outFlowFile = session.create(original);
+ final OutputStream out = session.write(outFlowFile);
+ recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, original, out);
+ recordSetWriter.beginRecordSet();
+
+ tuple = new Tuple<>(outFlowFile, recordSetWriter);
+ writers.put(relationship, tuple);
+ } else {
+ recordSetWriter = tuple.getValue();
+ }
+
+ recordSetWriter.write(record);
+ }
+ }
+ } catch (final SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException("Could not parse incoming data", e);
+ }
+ }
+ });
+
+ for (final Map.Entry<Relationship, Tuple<FlowFile, RecordSetWriter>> entry : writers.entrySet()) {
+ final Relationship relationship = entry.getKey();
+ final Tuple<FlowFile, RecordSetWriter> tuple = entry.getValue();
+ final RecordSetWriter writer = tuple.getValue();
+ FlowFile childFlowFile = tuple.getKey();
+
+ final WriteResult writeResult = writer.finishRecordSet();
+
+ try {
+ writer.close();
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to close Writer for {}", new Object[] {childFlowFile});
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+
+ childFlowFile = session.putAllAttributes(childFlowFile, attributes);
+ session.transfer(childFlowFile, relationship);
+ session.adjustCounter("Records Processed", writeResult.getRecordCount(), false);
+ session.adjustCounter("Records Routed to " + relationship.getName(), writeResult.getRecordCount(), false);
+
+ session.getProvenanceReporter().route(childFlowFile, relationship);
+ }
+ } catch (final Exception e) {
+ getLogger().error("Failed to process {}", new Object[] {flowFile, e});
+
+ for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
+ try {
+ tuple.getValue().close();
+ } catch (final Exception e1) {
+ getLogger().warn("Failed to close Writer for {}; some resources may not be cleaned up appropriately", new Object[] {tuple.getKey()});
+ }
+
+ session.remove(tuple.getKey());
+ }
+
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ } finally {
+ for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
+ final RecordSetWriter writer = tuple.getValue();
+ try {
+ writer.close();
+ } catch (final Exception e) {
+ getLogger().warn("Failed to close Record Writer for {}; some resources may not be properly cleaned up", new Object[] {tuple.getKey(), e});
+ }
+ }
+ }
+
+ if (isRouteOriginal()) {
+ flowFile = session.putAttribute(flowFile, "record.count", String.valueOf(numRecords));
+ session.transfer(flowFile, REL_ORIGINAL);
+ } else {
+ session.remove(flowFile);
+ }
+
+ getLogger().info("Successfully processed {}, creating {} derivative FlowFiles and processing {} records", new Object[] {flowFile, writers.size(), numRecords});
+ }
+
+ protected abstract Set<Relationship> route(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context, T flowFileContext);
+
+ protected abstract boolean isRouteOriginal();
+
+ protected abstract T getFlowFileContext(FlowFile flowFile, ProcessContext context);
+}