You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/20 03:05:26 UTC
nifi git commit: NIFI-3948: This closes #1834. Added flush() method
to RecordWriter and call it when writing a single record to OutputStream for
PublishKafkaRecord. Also removed no-longer-used class WriteAvroResult
Repository: nifi
Updated Branches:
refs/heads/master 58ce52d5d -> c49933f03
NIFI-3948: This closes #1834. Added flush() method to RecordWriter and call it when writing a single record to OutputStream for PublishKafkaRecord. Also removed no-longer-used class WriteAvroResult
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c49933f0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c49933f0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c49933f0
Branch: refs/heads/master
Commit: c49933f03df295760e1b6764e38dddbc9b2d31e6
Parents: 58ce52d
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri May 19 20:52:26 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri May 19 23:05:04 2017 -0400
----------------------------------------------------------------------
.../serialization/AbstractRecordSetWriter.java | 5 ++
.../apache/nifi/serialization/RecordWriter.java | 7 +++
.../serialization/record/MockRecordWriter.java | 5 ++
.../processors/kafka/pubsub/PublisherLease.java | 1 +
.../kafka/pubsub/util/MockRecordWriter.java | 6 ++
.../groovy/test_record_writer_inline.groovy | 4 ++
.../processors/standard/TestQueryRecord.java | 6 ++
.../org/apache/nifi/avro/WriteAvroResult.java | 63 --------------------
.../avro/WriteAvroResultWithExternalSchema.java | 9 ++-
.../nifi/avro/WriteAvroResultWithSchema.java | 5 ++
.../org/apache/nifi/csv/WriteCSVResult.java | 5 ++
.../org/apache/nifi/json/WriteJsonResult.java | 7 +++
.../apache/nifi/text/FreeFormTextWriter.java | 7 +--
13 files changed, 61 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
index 6bf574f..6ce9138 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -40,6 +40,11 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
}
@Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
public WriteResult write(final RecordSet recordSet) throws IOException {
beginRecordSet();
Record record;
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index 720953c..6c21a39 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -37,4 +37,11 @@ public interface RecordWriter extends Closeable {
* the mime.type attribute.
*/
String getMimeType();
+
+ /**
+ * Flushes any buffered data to the underlying storage mechanism
+ *
+ * @throws IOException if unable to write to the underlying storage mechanism
+ */
+ void flush() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 1d6aafe..891bbe3 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -66,6 +66,11 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
private boolean headerWritten = false;
@Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
public WriteResult write(final RecordSet rs) throws IOException {
if (header != null && !headerWritten) {
out.write(header.getBytes());
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 4b3a3ae..66641df 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -113,6 +113,7 @@ public class PublisherLease implements Closeable {
recordCount++;
baos.reset();
writer.write(record);
+ writer.flush();
final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 27df57b..1549626 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -60,6 +60,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new RecordSetWriter() {
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
@Override
public WriteResult write(final RecordSet rs) throws IOException {
out.write(header.getBytes());
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
index b0daaca..c961171 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
@@ -93,6 +93,10 @@ class GroovyRecordSetWriter implements RecordSetWriter {
@Override
public void close() throws IOException {
}
+
+ @Override
+ public void flush() throws IOException {
+ }
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index c00eb4b..c6035f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -271,6 +271,12 @@ public class TestQueryRecord {
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
return new RecordSetWriter() {
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
@Override
public WriteResult write(final RecordSet rs) throws IOException {
final int colCount = rs.getSchema().getFieldCount();
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
deleted file mode 100644
index 799d3ee..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.Record;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-
-public abstract class WriteAvroResult implements RecordSetWriter {
- private final Schema schema;
- private final OutputStream out;
-
- public WriteAvroResult(final Schema schema, final OutputStream out) {
- this.schema = schema;
- this.out = out;
- }
-
- protected Schema getSchema() {
- return schema;
- }
-
- @Override
- public WriteResult write(final Record record) throws IOException {
- final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
-
- final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
- try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
- dataFileWriter.create(schema, out);
- dataFileWriter.append(rec);
- }
-
- return WriteResult.of(1, Collections.emptyMap());
- }
-
- @Override
- public String getMimeType() {
- return "application/avro-binary";
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index c1f000b..25d494e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -61,8 +61,7 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
@Override
protected Map<String, String> onFinishRecordSet() throws IOException {
- encoder.flush();
- buffered.flush();
+ flush();
return schemaAccessWriter.getAttributes(recordSchema);
}
@@ -74,6 +73,12 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
}
@Override
+ public void flush() throws IOException {
+ encoder.flush();
+ buffered.flush();
+ }
+
+ @Override
public String getMimeType() {
return "application/avro-binary";
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
index dd15118..ae2f109 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
@@ -49,6 +49,11 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
}
@Override
+ public void flush() throws IOException {
+ dataFileWriter.flush();
+ }
+
+ @Override
public Map<String, String> writeRecord(final Record record) throws IOException {
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
dataFileWriter.append(rec);
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index f8998f9..34a51ba 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -89,6 +89,11 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
}
@Override
+ public void flush() throws IOException {
+ printer.flush();
+ }
+
+ @Override
public Map<String, String> writeRecord(final Record record) throws IOException {
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index a41412f..8acaa04 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -96,6 +96,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
}
@Override
+ public void flush() throws IOException {
+ if (generator != null) {
+ generator.flush();
+ }
+ }
+
+ @Override
public Map<String, String> writeRecord(final Record record) throws IOException {
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
return schemaAccess.getAttributes(recordSchema);
http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
index 7012504..f22f592 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
@@ -17,6 +17,7 @@
package org.apache.nifi.text;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@@ -37,13 +38,11 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
private static final byte NEW_LINE = (byte) '\n';
private final PropertyValue propertyValue;
private final Charset charset;
- private final OutputStream out;
public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out) {
- super(out);
+ super(new BufferedOutputStream(out));
this.propertyValue = textPropertyValue;
this.charset = characterSet;
- this.out = out;
}
private List<String> getColumnNames(final RecordSchema schema) {
@@ -60,7 +59,7 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
- write(record, out, getColumnNames(record.getSchema()));
+ write(record, getOutputStream(), getColumnNames(record.getSchema()));
return Collections.emptyMap();
}