You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2019/11/20 16:18:16 UTC

[asterixdb] branch master updated: [NO ISSUE][NET] Split delivery of messages and exceptions

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


View the commit online:
https://github.com/apache/asterixdb/commit/28f99d26f93bdfa1a14b67579bfe0694b7785a4e

The following commit(s) were added to refs/heads/master by this push:
     new 28f99d2  [NO ISSUE][NET] Split delivery of messages and exceptions
28f99d2 is described below

commit 28f99d26f93bdfa1a14b67579bfe0694b7785a4e
Author: Michael Blow <mb...@apache.org>
AuthorDate: Fri Nov 1 13:47:07 2019 -0400

    [NO ISSUE][NET] Split delivery of messages and exceptions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Change-Id: I5a97e1eb1e2a3ec207591b3d5b8b7f1949a80fbc
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4025
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../hyracks/control/cc/ClientInterfaceIPCI.java    | 12 ++++++++-
 .../hyracks/control/cc/ClusterControllerIPCI.java  | 10 ++++---
 .../hyracks/control/nc/NodeControllerIPCI.java     | 11 ++++++--
 .../java/org/apache/hyracks/ipc/api/IIPCI.java     | 16 ++++++++---
 .../org/apache/hyracks/ipc/api/RPCInterface.java   | 22 ++++++++-------
 .../org/apache/hyracks/ipc/impl/IPCSystem.java     | 13 ++-------
 .../java/org/apache/hyracks/ipc/tests/IPCTest.java | 31 ++++++++++------------
 7 files changed, 68 insertions(+), 47 deletions(-)

diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index a78c269..2547476 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -64,7 +64,7 @@ class ClientInterfaceIPCI implements IIPCI {
     }
 
     @Override
-    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
         HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
         switch (fn.getFunctionId()) {
             case GET_CLUSTER_CONTROLLER_INFO:
@@ -200,4 +200,14 @@ class ClientInterfaceIPCI implements IIPCI {
                 }
         }
     }
+
+    @Override
+    public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+        LOGGER.info("exception in/or processing message", exception);
+        try {
+            handle.send(mid, null, exception);
+        } catch (IPCException e) {
+            LOGGER.warn("error sending exception response", e);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 0e4ad41..d263cc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -60,8 +60,7 @@ class ClusterControllerIPCI implements IIPCI {
     }
 
     @Override
-    public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
-            Exception exception) {
+    public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload) {
         CCNCFunctions.Function fn = (Function) payload;
         switch (fn.getFunctionId()) {
             case REGISTER_NODE:
@@ -170,6 +169,11 @@ class ClusterControllerIPCI implements IIPCI {
         }
     }
 
+    @Override
+    public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+        LOGGER.info("exception in/or processing message", exception);
+    }
+
     private static void processNodeHeartbeat(ClusterControllerService ccs, CCNCFunctions.Function fn) {
         final ExecutorService executor = ccs.getExecutor();
         if (executor != null) {
@@ -177,4 +181,4 @@ class ClusterControllerIPCI implements IIPCI {
             executor.execute(new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData(), nhf.getNcAddress()));
         }
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 836c624..df08c04 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -37,12 +37,15 @@ import org.apache.hyracks.control.nc.work.UnDeployBinaryWork;
 import org.apache.hyracks.control.nc.work.UndeployJobSpecWork;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * Interprocess communication in a node controller
  * This class must be refactored with each function carrying its own implementation
  */
 final class NodeControllerIPCI implements IIPCI {
+    private static final Logger LOGGER = LogManager.getLogger();
     private final NodeControllerService ncs;
 
     /**
@@ -53,8 +56,7 @@ final class NodeControllerIPCI implements IIPCI {
     }
 
     @Override
-    public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
-            Exception exception) {
+    public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload) {
         CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
         switch (fn.getFunctionId()) {
             case SEND_APPLICATION_MESSAGE:
@@ -150,4 +152,9 @@ final class NodeControllerIPCI implements IIPCI {
         }
 
     }
+
+    @Override
+    public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+        LOGGER.info("exception in/or processing message", exception);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
index 02698fa..bf1bc33 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.ipc.api;
 /**
  * The interprocess communication interface that handles communication between different processes across the cluster
  */
-@FunctionalInterface
 public interface IIPCI {
 
     /**
@@ -34,8 +33,19 @@ public interface IIPCI {
      *            the request message id (if the message is a response to a request)
      * @param payload
      *            the message payload
+     */
+    void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload);
+
+    /**
+     * handles an error message, or failure to unmarshall the message
+     * @param handle
+     *            the message IPC handle
+     * @param mid
+     *            the message id
+     * @param rmid
+     *            the request message id (if the message is a response to a request)
      * @param exception
-     *            an exception if the message was an error message
+     *            an exception
      */
-    void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
+    void onError(IIPCHandle handle, long mid, long rmid, Exception exception);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
index 7dae541..fd98b5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
@@ -31,7 +31,7 @@ public class RPCInterface implements IIPCI {
     public Object call(IIPCHandle handle, Object request) throws Exception {
         Request req;
         long mid;
-        synchronized (this) {
+        synchronized (reqMap) {
             req = new Request(handle, this);
             mid = handle.send(-1, request, null);
             reqMap.put(mid, req);
@@ -40,21 +40,23 @@ public class RPCInterface implements IIPCI {
     }
 
     @Override
-    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+    public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
         Request req;
-        synchronized (this) {
+        synchronized (reqMap) {
             req = reqMap.remove(rmid);
         }
         assert req != null;
-        if (exception != null) {
-            req.setException(exception);
-        } else {
-            req.setResult(payload);
-        }
+        req.setResult(payload);
     }
 
-    protected synchronized void removeRequest(Request r) {
-        reqMap.remove(r);
+    @Override
+    public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+        Request req;
+        synchronized (reqMap) {
+            req = reqMap.remove(rmid);
+        }
+        assert req != null;
+        req.setException(exception);
     }
 
     private static class Request {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index 8d90ba3..4a19a33 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -29,13 +29,8 @@ import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
 import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class IPCSystem {
-    private static final Logger LOGGER = LogManager.getLogger();
-
     private final IPCConnectionManager cMgr;
 
     private final IIPCI ipci;
@@ -101,15 +96,11 @@ public class IPCSystem {
     void deliverIncomingMessage(final Message message) {
         long mid = message.getMessageId();
         long rmid = message.getRequestMessageId();
-        Object payload = null;
-        Exception exception = null;
         if (message.getFlag() == Message.ERROR) {
-            exception = (Exception) message.getPayload();
-            LOGGER.log(Level.INFO, "Exception in message", exception);
+            ipci.onError(message.getIPCHandle(), mid, rmid, (Exception) message.getPayload());
         } else {
-            payload = message.getPayload();
+            ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, message.getPayload());
         }
-        ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, payload, exception);
     }
 
     IPCConnectionManager getConnectionManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index 00bd761..0d8d69b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -47,7 +47,7 @@ public class IPCTest {
         IIPCHandle handle = client.getHandle(serverAddr, 0);
 
         for (int i = 0; i < 100; ++i) {
-            Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+            Assert.assertEquals(rpci.call(handle, i), 2 * i);
         }
 
         try {
@@ -62,27 +62,24 @@ public class IPCTest {
         final Executor executor = Executors.newCachedThreadPool();
         IIPCI ipci = new IIPCI() {
             @Override
-            public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid, final Object payload,
-                    Exception exception) {
-                executor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        Object result = null;
-                        Exception exception = null;
+            public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
+                executor.execute(() -> {
+                    try {
+                        handle.send(mid, (int) payload * 2, null);
+                    } catch (Exception e) {
                         try {
-                            Integer i = (Integer) payload;
-                            result = i.intValue() * 2;
-                        } catch (Exception e) {
-                            exception = e;
-                        }
-                        try {
-                            handle.send(mid, result, exception);
-                        } catch (IPCException e) {
-                            e.printStackTrace();
+                            handle.send(mid, null, e);
+                        } catch (IPCException e1) {
+                            e1.printStackTrace();
                         }
                     }
                 });
             }
+
+            @Override
+            public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+                exception.printStackTrace();
+            }
         };
         return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), PlainSocketChannelFactory.INSTANCE, ipci,
                 new JavaSerializationBasedPayloadSerializerDeserializer());