You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/08/14 00:29:55 UTC

asterixdb git commit: [NO ISSUE][NET] Skip Channel Write on Connection Failure

Repository: asterixdb
Updated Branches:
  refs/heads/master 98dd7f626 -> 8507f947e


[NO ISSUE][NET] Skip Channel Write on Connection Failure

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Do not attempt to adjust channel writability if
  the failure was due to connection failure since
  no more messages will be sent on that connection.
  This is done to prevent a possible deadlock between
  network IOThread that detected connection failure
  and another thread that might be accessing the channel.
- Make sending error code conditions more explicit since
  we currently have a single error code that is sent.

Change-Id: Ic25f05ac2c0d02699324f2d1b80c51f392654106
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2892
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/8507f947
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/8507f947
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/8507f947

Branch: refs/heads/master
Commit: 8507f947eb8ff7586dcc0fa60829f843482a11b1
Parents: 98dd7f6
Author: Murtadha Hubail <mh...@apache.org>
Authored: Mon Aug 13 15:44:13 2018 -0700
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Mon Aug 13 17:29:38 2018 -0700

----------------------------------------------------------------------
 .../messaging/MessagingChannelWriteInterface.java |  4 ++--
 .../std/collectors/InputChannelFrameReader.java   |  2 +-
 .../collectors/NonDeterministicChannelReader.java |  2 +-
 .../muxdemux/AbstractChannelWriteInterface.java   | 18 +++++++++---------
 .../net/protocols/muxdemux/ChannelSet.java        |  2 +-
 .../muxdemux/FullFrameChannelWriteInterface.java  |  4 ++--
 6 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8507f947/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
index 43c1542..9ad870c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
@@ -46,10 +46,10 @@ public class MessagingChannelWriteInterface extends AbstractChannelWriteInterfac
             } else {
                 adjustChannelWritability();
             }
-        } else if (ecode >= 0 && !ecodeSent) {
+        } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
-            writerState.getCommand().setData(ecode);
+            writerState.getCommand().setData(REMOTE_ERROR_CODE);
             writerState.reset(null, 0, null);
             ecodeSent = true;
             ccb.reportLocalEOS();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8507f947/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 5ce29c2..f32adcc 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -66,7 +66,7 @@ public class InputChannelFrameReader implements IFrameReader, IInputChannelMonit
             }
         }
         if (hasFailed()) {
-            if (errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE) {
+            if (errorCode == AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE) {
                 throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR);
             }
             // Do not throw exception here to allow the root cause exception gets propagated to the master first.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8507f947/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
index d8dc4b9..becbb00 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java
@@ -158,7 +158,7 @@ public class NonDeterministicChannelReader implements IInputChannelMonitor, IPar
         // Note: if a remote failure overwrites the value of localFailure, then we rely on
         // the fact that the remote task will notify the cc of the failure.
         // Otherwise, the local task must fail
-        localFailure = errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE;
+        localFailure = errorCode == AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE;
         failSenders.set(senderIndex);
         eosSenders.set(senderIndex);
         notifyAll();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8507f947/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 7ed9bfa..0a28e93 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.net.protocols.muxdemux;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -32,18 +33,18 @@ import org.apache.logging.log4j.Logger;
 public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface {
 
     public static final int NO_ERROR_CODE = 0;
+    public static final int CONNECTION_LOST_ERROR_CODE = -1;
     public static final int REMOTE_ERROR_CODE = 1;
-    public static final int LOCAL_ERROR_CODE = -1;
     private static final Logger LOGGER = LogManager.getLogger();
     protected final IChannelControlBlock ccb;
     protected final Queue<ByteBuffer> wiFullQueue;
+    protected final AtomicInteger ecode = new AtomicInteger(NO_ERROR_CODE);
     protected boolean channelWritabilityState;
     protected final int channelId;
     protected IBufferAcceptor eba;
     protected int credits;
     protected boolean eos;
     protected boolean eosSent;
-    protected int ecode;
     protected boolean ecodeSent;
     protected ByteBuffer currentWriteBuffer;
     private final ICloseableBufferAcceptor fba;
@@ -56,7 +57,6 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte
         credits = 0;
         eos = false;
         eosSent = false;
-        ecode = -1;
         ecodeSent = false;
     }
 
@@ -78,10 +78,7 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte
         if (eos && !eosSent) {
             return true;
         }
-        if (ecode >= 0 && !ecodeSent) {
-            return true;
-        }
-        return false;
+        return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
     }
 
     @Override
@@ -138,7 +135,7 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte
                     return;
                 }
                 eos = true;
-                if (ecode != REMOTE_ERROR_CODE) {
+                if (ecode.get() != REMOTE_ERROR_CODE) {
                     adjustChannelWritability();
                 }
             }
@@ -146,8 +143,11 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInte
 
         @Override
         public void error(int ecode) {
+            AbstractChannelWriteInterface.this.ecode.set(ecode);
+            if (ecode == CONNECTION_LOST_ERROR_CODE) {
+                return;
+            }
             synchronized (ccb) {
-                AbstractChannelWriteInterface.this.ecode = ecode;
                 adjustChannelWritability();
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8507f947/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
index a546349..31a37ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -200,7 +200,7 @@ public class ChannelSet {
             for (int i = 0; i < ccbArray.length; ++i) {
                 ChannelControlBlock ccb = ccbArray[i];
                 if (ccb != null) {
-                    ccb.reportRemoteError(AbstractChannelWriteInterface.LOCAL_ERROR_CODE);
+                    ccb.reportRemoteError(AbstractChannelWriteInterface.CONNECTION_LOST_ERROR_CODE);
                     markEOSAck(i);
                     unmarkPendingCredits(i);
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8507f947/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index 17b70a8..628007d 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -50,10 +50,10 @@ public class FullFrameChannelWriteInterface extends AbstractChannelWriteInterfac
             } else {
                 adjustChannelWritability();
             }
-        } else if (ecode >= 0 && !ecodeSent) {
+        } else if (ecode.get() == REMOTE_ERROR_CODE && !ecodeSent) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.ERROR);
-            writerState.getCommand().setData(ecode);
+            writerState.getCommand().setData(REMOTE_ERROR_CODE);
             writerState.reset(null, 0, null);
             ecodeSent = true;
             ccb.reportLocalEOS();