You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/18 03:55:29 UTC

nifi git commit: NIFI-3894: This closes #1820. Fixed close and consume order with compression.

Repository: nifi
Updated Branches:
  refs/heads/master 8e1c79eaa -> 36e7bd616


NIFI-3894: This closes #1820. Fixed close and consume order with compression.

Before this fix, 'NullPointerException: Inflater has been closed' can be thrown as the Inflater is closed before input stream is consumed.

Also, calling close from AbstractTransaction.receive is removed, because the DataPacket is exposed as its return value and this class will not be able to know when to close the stream.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/36e7bd61
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/36e7bd61
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/36e7bd61

Branch: refs/heads/master
Commit: 36e7bd6164f6d7294ec17fe6487a74b042ba2f25
Parents: 8e1c79e
Author: Koji Kawamura <ij...@apache.org>
Authored: Thu May 18 10:05:08 2017 +0900
Committer: joewitt <jo...@apache.org>
Committed: Wed May 17 22:57:33 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/remote/AbstractTransaction.java  |  5 -----
 .../remote/protocol/AbstractFlowFileServerProtocol.java   | 10 +++++-----
 2 files changed, 5 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/36e7bd61/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
index 2d6b2e1..826cf00 100644
--- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
+++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
@@ -148,11 +148,6 @@ public abstract class AbstractTransaction implements Transaction {
                 final InputStream dataIn = compress ? new CompressionInputStream(is) : is;
                 final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
 
-                if (compress) {
-                    // Close CompressionInputStream to free acquired memory, without closing underlying stream.
-                    dataIn.close();
-                }
-
                 if (packet == null) {
                     this.dataAvailable = false;
                 } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/36e7bd61/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
index f539808..3a5f23c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -441,11 +441,6 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
 
             final DataPacket dataPacket = codec.decode(checkedInputStream);
 
-            if (handshakeProperties.isUseGzip()) {
-                // Close CompressionInputStream to free acquired memory, without closing underlying stream.
-                checkedInputStream.close();
-            }
-
             if (dataPacket == null) {
                 logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer);
                 break;
@@ -454,6 +449,11 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
             flowFile = session.importFrom(dataPacket.getData(), flowFile);
             flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
 
+            if (handshakeProperties.isUseGzip()) {
+                // Close CompressionInputStream to free acquired memory, without closing underlying stream.
+                checkedInputStream.close();
+            }
+
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());