You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/20 03:05:26 UTC

nifi git commit: NIFI-3948: This closes #1834. Added flush() method to RecordWriter and call it when writing a single record to OutputStream for PublishKafkaRecord. Also removed no-longer-used class WriteAvroResult

Repository: nifi
Updated Branches:
  refs/heads/master 58ce52d5d -> c49933f03


NIFI-3948: This closes #1834. Added flush() method to RecordWriter and call it when writing a single record to OutputStream for PublishKafkaRecord. Also removed no-longer-used class WriteAvroResult

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c49933f0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c49933f0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c49933f0

Branch: refs/heads/master
Commit: c49933f03df295760e1b6764e38dddbc9b2d31e6
Parents: 58ce52d
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri May 19 20:52:26 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri May 19 23:05:04 2017 -0400

----------------------------------------------------------------------
 .../serialization/AbstractRecordSetWriter.java  |  5 ++
 .../apache/nifi/serialization/RecordWriter.java |  7 +++
 .../serialization/record/MockRecordWriter.java  |  5 ++
 .../processors/kafka/pubsub/PublisherLease.java |  1 +
 .../kafka/pubsub/util/MockRecordWriter.java     |  6 ++
 .../groovy/test_record_writer_inline.groovy     |  4 ++
 .../processors/standard/TestQueryRecord.java    |  6 ++
 .../org/apache/nifi/avro/WriteAvroResult.java   | 63 --------------------
 .../avro/WriteAvroResultWithExternalSchema.java |  9 ++-
 .../nifi/avro/WriteAvroResultWithSchema.java    |  5 ++
 .../org/apache/nifi/csv/WriteCSVResult.java     |  5 ++
 .../org/apache/nifi/json/WriteJsonResult.java   |  7 +++
 .../apache/nifi/text/FreeFormTextWriter.java    |  7 +--
 13 files changed, 61 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
index 6bf574f..6ce9138 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -40,6 +40,11 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
     }
 
     @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    @Override
     public WriteResult write(final RecordSet recordSet) throws IOException {
         beginRecordSet();
         Record record;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
index 720953c..6c21a39 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/RecordWriter.java
@@ -37,4 +37,11 @@ public interface RecordWriter extends Closeable {
      *         the mime.type attribute.
      */
     String getMimeType();
+
+    /**
+     * Flushes any buffered data to the underlying storage mechanism
+     *
+     * @throws IOException if unable to write to the underlying storage mechanism
+     */
+    void flush() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 1d6aafe..891bbe3 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -66,6 +66,11 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
             private boolean headerWritten = false;
 
             @Override
+            public void flush() throws IOException {
+                out.flush();
+            }
+
+            @Override
             public WriteResult write(final RecordSet rs) throws IOException {
                 if (header != null && !headerWritten) {
                     out.write(header.getBytes());

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 4b3a3ae..66641df 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -113,6 +113,7 @@ public class PublisherLease implements Closeable {
                 recordCount++;
                 baos.reset();
                 writer.write(record);
+                writer.flush();
 
                 final byte[] messageContent = baos.toByteArray();
                 final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 27df57b..1549626 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -60,6 +60,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
     @Override
     public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
         return new RecordSetWriter() {
+
+            @Override
+            public void flush() throws IOException {
+                out.flush();
+            }
+
             @Override
             public WriteResult write(final RecordSet rs) throws IOException {
                 out.write(header.getBytes());

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
index b0daaca..c961171 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy
@@ -93,6 +93,10 @@ class GroovyRecordSetWriter implements RecordSetWriter {
     @Override
     public void close() throws IOException {
     }
+    
+    @Override
+    public void flush() throws IOException {
+    }
 }
 
 class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index c00eb4b..c6035f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -271,6 +271,12 @@ public class TestQueryRecord {
         @Override
         public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
             return new RecordSetWriter() {
+
+                @Override
+                public void flush() throws IOException {
+                    out.flush();
+                }
+
                 @Override
                 public WriteResult write(final RecordSet rs) throws IOException {
                     final int colCount = rs.getSchema().getFieldCount();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
deleted file mode 100644
index 799d3ee..0000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.avro;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.Record;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collections;
-
-public abstract class WriteAvroResult implements RecordSetWriter {
-    private final Schema schema;
-    private final OutputStream out;
-
-    public WriteAvroResult(final Schema schema, final OutputStream out) {
-        this.schema = schema;
-        this.out = out;
-    }
-
-    protected Schema getSchema() {
-        return schema;
-    }
-
-    @Override
-    public WriteResult write(final Record record) throws IOException {
-        final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
-
-        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
-        try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
-            dataFileWriter.create(schema, out);
-            dataFileWriter.append(rec);
-        }
-
-        return WriteResult.of(1, Collections.emptyMap());
-    }
-
-    @Override
-    public String getMimeType() {
-        return "application/avro-binary";
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index c1f000b..25d494e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -61,8 +61,7 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
 
     @Override
     protected Map<String, String> onFinishRecordSet() throws IOException {
-        encoder.flush();
-        buffered.flush();
+        flush();
         return schemaAccessWriter.getAttributes(recordSchema);
     }
 
@@ -74,6 +73,12 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
     }
 
     @Override
+    public void flush() throws IOException {
+        encoder.flush();
+        buffered.flush();
+    }
+
+    @Override
     public String getMimeType() {
         return "application/avro-binary";
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
index dd15118..ae2f109 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
@@ -49,6 +49,11 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
     }
 
     @Override
+    public void flush() throws IOException {
+        dataFileWriter.flush();
+    }
+
+    @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
         final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, schema);
         dataFileWriter.append(rec);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index f8998f9..34a51ba 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -89,6 +89,11 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
     }
 
     @Override
+    public void flush() throws IOException {
+        printer.flush();
+    }
+
+    @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
         int i = 0;
         for (final RecordField recordField : recordSchema.getFields()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index a41412f..8acaa04 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -96,6 +96,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
     }
 
     @Override
+    public void flush() throws IOException {
+        if (generator != null) {
+            generator.flush();
+        }
+    }
+
+    @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
         writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
         return schemaAccess.getAttributes(recordSchema);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c49933f0/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
index 7012504..f22f592 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.text;
 
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
@@ -37,13 +38,11 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
     private static final byte NEW_LINE = (byte) '\n';
     private final PropertyValue propertyValue;
     private final Charset charset;
-    private final OutputStream out;
 
     public FreeFormTextWriter(final PropertyValue textPropertyValue, final Charset characterSet, final OutputStream out) {
-        super(out);
+        super(new BufferedOutputStream(out));
         this.propertyValue = textPropertyValue;
         this.charset = characterSet;
-        this.out = out;
     }
 
     private List<String> getColumnNames(final RecordSchema schema) {
@@ -60,7 +59,7 @@ public class FreeFormTextWriter extends AbstractRecordSetWriter implements Recor
 
     @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
-        write(record, out, getColumnNames(record.getSchema()));
+        write(record, getOutputStream(), getColumnNames(record.getSchema()));
         return Collections.emptyMap();
     }