You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2019/07/23 16:12:21 UTC

[asterixdb] branch master updated (81074ed -> 71e8ad0)

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git.


    from 81074ed  [ASTERIXDB-2599][STO] Cleanup compression LAFs
     new 272a180  [NO ISSUE][STO] Tolerate Corrupted System Checkpoint Files
     new 8c99391  [NO ISSUE][NET] Do Not Flush Partial Frames on Network Errors
     new 71e8ad0  Merge commit '8c99391' from stabilization-f69489

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../asterix/test/logging/CheckpointingTest.java    | 11 +++++--
 .../test/resources/cc-small-txn-log-partition.conf |  1 +
 .../api/cluster_state_1/cluster_state_1.1.regexadm |  2 +-
 .../cluster_state_1_full.1.regexadm                |  2 +-
 .../cluster_state_1_less.1.regexadm                |  2 +-
 .../common/config/TransactionProperties.java       |  2 +-
 .../recovery/AbstractCheckpointManager.java        | 37 +++++++++++++---------
 .../service/recovery/CheckpointThread.java         |  3 +-
 .../protocols/muxdemux/ChannelControlBlock.java    |  1 -
 9 files changed, 37 insertions(+), 24 deletions(-)


[asterixdb] 03/03: Merge commit '8c99391' from stabilization-f69489

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 71e8ad084dc1f2cced283a17dcc4539733459ade
Merge: 81074ed 8c99391
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Tue Jul 23 01:30:39 2019 +0300

    Merge commit '8c99391' from stabilization-f69489
    
    Change-Id: I139c265ba998c32ec049c8a8bbd7a5a213895d2a

 .../asterix/test/logging/CheckpointingTest.java    | 11 +++++--
 .../test/resources/cc-small-txn-log-partition.conf |  1 +
 .../api/cluster_state_1/cluster_state_1.1.regexadm |  2 +-
 .../cluster_state_1_full.1.regexadm                |  2 +-
 .../cluster_state_1_less.1.regexadm                |  2 +-
 .../common/config/TransactionProperties.java       |  2 +-
 .../recovery/AbstractCheckpointManager.java        | 37 +++++++++++++---------
 .../service/recovery/CheckpointThread.java         |  3 +-
 .../protocols/muxdemux/ChannelControlBlock.java    |  1 -
 9 files changed, 37 insertions(+), 24 deletions(-)

diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
index fb6ca6b,b294828..0477147
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
@@@ -55,13 -49,12 +55,13 @@@ public class TransactionProperties exte
                  "The checkpoint threshold (in terms of LSNs (log sequence numbers) that have been written to the "
                          + "transaction log, i.e., the length of the transaction log) for transaction logs"),
          TXN_LOG_CHECKPOINT_POLLFREQUENCY(
 -                INTEGER,
 +                POSITIVE_INTEGER,
                  120,
 -                "The frequency (in seconds) the checkpoint thread should check to see if a checkpoint should be written"),
 -        TXN_LOG_CHECKPOINT_HISTORY(INTEGER, 2, "The number of checkpoints to keep in the transaction log"),
 +                "The frequency (in seconds) the checkpoint thread should check to see if a checkpoint should be "
 +                        + "written"),
-         TXN_LOG_CHECKPOINT_HISTORY(UNSIGNED_INTEGER, 0, "The number of checkpoints to keep in the transaction log"),
++        TXN_LOG_CHECKPOINT_HISTORY(UNSIGNED_INTEGER, 2, "The number of checkpoints to keep in the transaction log"),
          TXN_LOCK_ESCALATIONTHRESHOLD(
 -                INTEGER,
 +                UNSIGNED_INTEGER,
                  1000,
                  "The maximum number of entity locks to obtain before upgrading to a dataset lock"),
          TXN_LOCK_SHRINKTIMER(
diff --cc asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 446eec5,1965adc..cbc5e35
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@@ -85,13 -94,9 +85,12 @@@ public class CheckpointThread extends T
                      if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
                          lastCheckpointLSN = currentCheckpointAttemptMinLSN;
                      }
- 
 -                } catch (Exception e) {
 -                    LOGGER.log(Level.ERROR, "checkpoint attempt failed", e);
                  }
 +            } catch (InterruptedException e) {
 +                LOGGER.info("Checkpoint thread interrupted", e);
 +                Thread.currentThread().interrupt();
 +            } catch (Exception e) {
-                 LOGGER.error("Error during checkpoint", e);
++                LOGGER.error("checkpoint attempt failed", e);
              }
          }
      }


[asterixdb] 01/03: [NO ISSUE][STO] Tolerate Corrupted System Checkpoint Files

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 272a180c91c3b1afb12189c6cf7320ee313a9b03
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Mon Jul 15 17:34:53 2019 +0300

    [NO ISSUE][STO] Tolerate Corrupted System Checkpoint Files
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Do not delete corrupted checkpoint files until a valid
      checkpoint is persisted. This ensure a forged checkpoint
      that will force recovery to start from the begining is
      always used until a valid checkpoint is found.
    - Attempt to read the latest checkpoint file right after
      writing it and before attempting cleaning up invalid and
      old checkpoint files.
    - Use on disk files to determine next checkpoint id to
      account for existing corrupted checkpoints.
    - Maintain two older checkpoint files in addition to the
      latest one.
    - Catch all exceptions on checkpointing failures since it is
      a try operation.
    - Add test scenairo for the new checkpoints clean up behavior.
    
    Change-Id: Iea689f5a644351491d9748273bb2158e8179f54d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3496
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Till Westmann <ti...@apache.org>
---
 .../asterix/test/logging/CheckpointingTest.java    | 11 ++++--
 .../test/resources/cc-small-txn-log-partition.conf |  1 +
 .../api/cluster_state_1/cluster_state_1.1.regexadm |  2 +-
 .../cluster_state_1_full.1.regexadm                |  2 +-
 .../cluster_state_1_less.1.regexadm                |  2 +-
 .../common/config/TransactionProperties.java       |  2 +-
 .../recovery/AbstractCheckpointManager.java        | 39 +++++++++++++---------
 .../service/recovery/CheckpointThread.java         |  4 +--
 8 files changed, 39 insertions(+), 24 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index e67246a..a68859c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -306,8 +306,8 @@ public class CheckpointingTest {
                 Checkpoint cpAfterCorruption = checkpointManager.getLatest();
                 // Make sure the valid checkpoint was returned
                 Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
-                // Make sure the corrupted checkpoint file was deleted
-                Assert.assertFalse(corruptedCheckpoint.exists());
+                // Make sure the corrupted checkpoint file was not deleted
+                Assert.assertTrue(corruptedCheckpoint.exists());
                 // Corrupt the valid checkpoint by replacing its content
                 final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
                 File validCheckpointFile = validCheckpointPath.toFile();
@@ -321,6 +321,13 @@ public class CheckpointingTest {
                 // Make sure the forged checkpoint recovery will start from the first available log
                 final long readableSmallestLSN = txnSubsystem.getLogManager().getReadableSmallestLSN();
                 Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= readableSmallestLSN);
+                // another call should still give us the forged checkpoint and the corrupted one should still be there
+                forgedCheckpoint = checkpointManager.getLatest();
+                Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < minFirstLSN);
+                Assert.assertTrue(corruptedCheckpoint.exists());
+                // do a succesful checkpoint and ensure now the corrupted file was deleted
+                checkpointManager.doSharpCheckpoint();
+                Assert.assertFalse(corruptedCheckpoint.exists());
             } finally {
                 nc.deInit();
             }
diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
index 811a40d..64b07fd 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
@@ -54,4 +54,5 @@ messaging.frame.count=512
 txn.log.partitionsize=2MB
 txn.log.buffer.pagesize=128KB
 txn.log.checkpoint.pollfrequency=2147483647
+txn.log.checkpoint.history=0
 storage.max.active.writable.datasets=50
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 79db9ae..28b97ac 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -42,7 +42,7 @@
     "txn\.lock\.timeout\.waitthreshold" : 60000,
     "txn\.log\.buffer\.numpages" : 8,
     "txn\.log\.buffer\.pagesize" : 4194304,
-    "txn\.log\.checkpoint\.history" : 0,
+    "txn\.log\.checkpoint\.history" : 2,
     "txn\.log\.checkpoint\.lsnthreshold" : 67108864,
     "txn\.log\.checkpoint\.pollfrequency" : 120,
     "txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 6e5547d..51f5fb7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -42,7 +42,7 @@
     "txn\.lock\.timeout\.waitthreshold" : 60000,
     "txn\.log\.buffer\.numpages" : 8,
     "txn\.log\.buffer\.pagesize" : 4194304,
-    "txn\.log\.checkpoint\.history" : 0,
+    "txn\.log\.checkpoint\.history" : 2,
     "txn\.log\.checkpoint\.lsnthreshold" : 67108864,
     "txn\.log\.checkpoint\.pollfrequency" : 120,
     "txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 3237551..edec41e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -42,7 +42,7 @@
     "txn\.lock\.timeout\.waitthreshold" : 60000,
     "txn\.log\.buffer\.numpages" : 8,
     "txn\.log\.buffer\.pagesize" : 4194304,
-    "txn\.log\.checkpoint\.history" : 0,
+    "txn\.log\.checkpoint\.history" : 2,
     "txn\.log\.checkpoint\.lsnthreshold" : 67108864,
     "txn\.log\.checkpoint\.pollfrequency" : 120,
     "txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
index 3215cd6..b294828 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
@@ -52,7 +52,7 @@ public class TransactionProperties extends AbstractProperties {
                 INTEGER,
                 120,
                 "The frequency (in seconds) the checkpoint thread should check to see if a checkpoint should be written"),
-        TXN_LOG_CHECKPOINT_HISTORY(INTEGER, 0, "The number of checkpoints to keep in the transaction log"),
+        TXN_LOG_CHECKPOINT_HISTORY(INTEGER, 2, "The number of checkpoints to keep in the transaction log"),
         TXN_LOCK_ESCALATIONTHRESHOLD(
                 INTEGER,
                 1000,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index e221da8..81002be 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -26,6 +26,7 @@ import java.nio.channels.ClosedByInterruptException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -100,7 +101,7 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager {
         if (checkpointFiles.isEmpty()) {
             return null;
         }
-        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, false);
         if (orderedCheckpoints.isEmpty()) {
             /*
              * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
@@ -136,8 +137,7 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager {
     }
 
     public Path getCheckpointPath(long checkpointId) {
-        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
-                + Long.toString(checkpointId));
+        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + checkpointId);
     }
 
     protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
@@ -173,7 +173,8 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager {
         // Write checkpoint file to disk
         try {
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
-            Files.write(path, bytes);
+            Files.write(path, bytes, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+            readCheckpoint(path);
         } catch (IOException e) {
             LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
             throw HyracksDataException.create(e);
@@ -200,16 +201,14 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager {
         return Arrays.asList(checkpoints);
     }
 
-    private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
+    private List<Checkpoint> getOrderedValidCheckpoints(List<File> checkpoints, boolean deleteCorrupted) {
         List<Checkpoint> checkpointObjectList = new ArrayList<>();
         for (File file : checkpoints) {
             try {
                 if (LOGGER.isWarnEnabled()) {
                     LOGGER.log(Level.WARN, "Reading checkpoint file: " + file.getAbsolutePath());
                 }
-                final JsonNode jsonNode =
-                        OBJECT_MAPPER.readValue(Files.readAllBytes(Paths.get(file.getAbsolutePath())), JsonNode.class);
-                Checkpoint cp = (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
+                Checkpoint cp = readCheckpoint(Paths.get(file.getAbsolutePath()));
                 checkpointObjectList.add(cp);
             } catch (ClosedByInterruptException e) {
                 Thread.currentThread().interrupt();
@@ -222,9 +221,8 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager {
                 if (LOGGER.isWarnEnabled()) {
                     LOGGER.log(Level.WARN, "Failed to read checkpoint file: " + file.getAbsolutePath(), e);
                 }
-                file.delete();
-                if (LOGGER.isWarnEnabled()) {
-                    LOGGER.log(Level.WARN, "Deleted corrupted checkpoint file: " + file.getAbsolutePath());
+                if (deleteCorrupted && file.delete()) {
+                    LOGGER.warn("Deleted corrupted checkpoint file: {}", file::getAbsolutePath);
                 }
             }
         }
@@ -234,7 +232,7 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager {
 
     private void cleanup() {
         final List<File> checkpointFiles = getCheckpointFiles();
-        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, true);
         final int deleteCount = orderedCheckpoints.size() - historyToKeep;
         for (int i = 0; i < deleteCount; i++) {
             final Checkpoint checkpoint = orderedCheckpoints.get(i);
@@ -247,11 +245,20 @@ public abstract class AbstractCheckpointManager implements ICheckpointManager {
     }
 
     private long getNextCheckpointId() {
-        final Checkpoint latest = getLatest();
-        if (latest == null) {
+        final List<File> checkpointFiles = getCheckpointFiles();
+        if (checkpointFiles.isEmpty()) {
             return FIRST_CHECKPOINT_ID;
         }
-        return latest.getId() + 1;
+        long maxOnDiskId = -1;
+        for (File checkpointFile : checkpointFiles) {
+            long fileId = Long.parseLong(checkpointFile.getName().substring(CHECKPOINT_FILENAME_PREFIX.length()));
+            maxOnDiskId = Math.max(maxOnDiskId, fileId);
+        }
+        return maxOnDiskId + 1;
     }
 
-}
\ No newline at end of file
+    private Checkpoint readCheckpoint(Path checkpointPath) throws IOException {
+        final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(checkpointPath), JsonNode.class);
+        return (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 1992057..1965adc 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -94,8 +94,8 @@ public class CheckpointThread extends Thread {
                     if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
                         lastCheckpointLSN = currentCheckpointAttemptMinLSN;
                     }
-                } catch (HyracksDataException e) {
-                    LOGGER.log(Level.ERROR, "Error during checkpoint", e);
+                } catch (Exception e) {
+                    LOGGER.log(Level.ERROR, "checkpoint attempt failed", e);
                 }
             }
         }


[asterixdb] 02/03: [NO ISSUE][NET] Do Not Flush Partial Frames on Network Errors

Posted by mb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 8c9939184b41cab4d6d075f61fa967255d869f20
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Fri Jul 19 02:09:52 2019 +0300

    [NO ISSUE][NET] Do Not Flush Partial Frames on Network Errors
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Do not flush partially read frames when a network error
      is reported to avoid processing invalid partial frames
      before the network failure is detected by the channel
      reader.
    
    Change-Id: Ia35b58412cc293426c18a7cd66d8d0a5962db4e2
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3503
    Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Till Westmann <ti...@apache.org>
---
 .../org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index f7ef2aa..75d442f 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -124,7 +124,6 @@ public class ChannelControlBlock implements IChannelControlBlock {
     }
 
     void reportRemoteError(int ecode) {
-        ri.flush();
         ri.getFullBufferAcceptor().error(ecode);
         remoteClose.set(true);
     }