You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2022/03/28 07:22:06 UTC

[zeppelin] branch master updated: [ZEPPELIN-5569] Implement onError

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cee906  [ZEPPELIN-5569] Implement onError
4cee906 is described below

commit 4cee9068c72805693bcb630ed13fea50da24506e
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Mon Mar 21 10:27:57 2022 +0100

    [ZEPPELIN-5569] Implement onError
    
    ### What is this PR for?
    This PR implement `OnError`, which is necessary to cleanup the connectionManager, if possible.
    
    ### What type of PR is it?
    - BugFix
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5569
    
    ### How should this be tested?
    * I have done some manual testing, but it is difficult to test this case.
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #4260 from Reamer/onError and squashes the following commits:
    
    46d1c88f2 [Philipp Dallig] catch RuntimeException
    15f22d55f [Philipp Dallig] Add Metrics
    f9e537912 [Philipp Dallig] Implement onError
    8d591d7ed [Philipp Dallig] remove NotebookSocket from sessionIdNotebookSocketMap
---
 .../apache/zeppelin/socket/ConnectionManager.java  | 41 +++++++++---------
 .../org/apache/zeppelin/socket/NotebookServer.java | 49 +++++++++++++++-------
 .../org/apache/zeppelin/socket/NotebookSocket.java |  4 +-
 .../org/apache/zeppelin/utils/ServerUtils.java     | 37 ++++++++++++++++
 4 files changed, 97 insertions(+), 34 deletions(-)

diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
index 51c8fdf..a3a4089 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -20,6 +20,10 @@ package org.apache.zeppelin.socket;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tags;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.GUI;
@@ -46,6 +50,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -62,21 +67,21 @@ public class ConnectionManager {
       .setPrettyPrinting()
       .registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
 
-  final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
+  final Queue<NotebookSocket> connectedSockets = Metrics.gaugeCollectionSize("zeppelin_connected_sockets", Tags.empty(), new ConcurrentLinkedQueue<>());
   // noteId -> connection
-  final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
+  final Map<String, List<NotebookSocket>> noteSocketMap = Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>());
   // user -> connection
-  final Map<String, Queue<NotebookSocket>> userSocketMap = new HashMap<>();
+  final Map<String, Queue<NotebookSocket>> userSocketMap = Metrics.gaugeMapSize("zeppelin_user_sockets", Tags.empty(), new HashMap<>());
 
   /**
-   * This is a special endpoint in the notebook websoket, Every connection in this Queue
+   * This is a special endpoint in the notebook websocket, Every connection in this Queue
    * will be able to watch every websocket event, it doesnt need to be listed into the map of
    * noteSocketMap. This can be used to get information about websocket traffic and watch what
    * is going on.
    */
   final Queue<NotebookSocket> watcherSockets = new ConcurrentLinkedQueue<>();
 
-  private final HashSet<String> collaborativeModeList = new HashSet<>();
+  private final HashSet<String> collaborativeModeList = Metrics.gaugeCollectionSize("zeppelin_collaborative_modes", Tags.empty(),new HashSet<>());
   private final Boolean collaborativeModeEnable = ZeppelinConfiguration
       .create()
       .isZeppelinNotebookCollaborativeModeEnable();
@@ -151,11 +156,9 @@ public class ConnectionManager {
   public String getAssociatedNoteId(NotebookSocket socket) {
     String associatedNoteId = null;
     synchronized (noteSocketMap) {
-      Set<String> noteIds = noteSocketMap.keySet();
-      for (String noteId : noteIds) {
-        List<NotebookSocket> sockets = noteSocketMap.get(noteId);
-        if (sockets.contains(socket)) {
-          associatedNoteId = noteId;
+      for (Entry<String, List<NotebookSocket>> noteSocketMapEntry : noteSocketMap.entrySet()) {
+        if (noteSocketMapEntry.getValue().contains(socket)) {
+          associatedNoteId = noteSocketMapEntry.getKey();
         }
       }
     }
@@ -205,7 +208,7 @@ public class ConnectionManager {
       for (NotebookSocket ns : connectedSockets) {
         try {
           ns.send(serializeMessage(m));
-        } catch (IOException e) {
+        } catch (IOException | RuntimeException e) {
           LOGGER.error("Send error: {}", m, e);
         }
       }
@@ -226,7 +229,7 @@ public class ConnectionManager {
     for (NotebookSocket conn : socketsToBroadcast) {
       try {
         conn.send(serializeMessage(m));
-      } catch (IOException e) {
+      } catch (IOException | RuntimeException e) {
         LOGGER.error("socket error", e);
       }
     }
@@ -242,7 +245,7 @@ public class ConnectionManager {
                   .message(serializeMessage(message))
                   .build()
                   .toJson());
-        } catch (IOException e) {
+        } catch (IOException | RuntimeException e) {
           LOGGER.error("Cannot broadcast message to watcher", e);
         }
       }
@@ -267,7 +270,7 @@ public class ConnectionManager {
       }
       try {
         conn.send(serializeMessage(m));
-      } catch (IOException e) {
+      } catch (IOException | RuntimeException e) {
         LOGGER.error("socket error", e);
       }
     }
@@ -289,7 +292,7 @@ public class ConnectionManager {
 
         try {
           conn.send(serializedMsg);
-        } catch (IOException  e) {
+        } catch (IOException | RuntimeException e) {
           LOGGER.error("Cannot broadcast message to conn", e);
         }
       }
@@ -319,7 +322,7 @@ public class ConnectionManager {
   public void unicast(Message m, NotebookSocket conn) {
     try {
       conn.send(serializeMessage(m));
-    } catch (IOException e) {
+    } catch (IOException | RuntimeException e) {
       LOGGER.error("socket error", e);
     }
     broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
@@ -385,9 +388,9 @@ public class ConnectionManager {
 
   public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap) {
     if (null != userParagraphMap) {
-      for (String user : userParagraphMap.keySet()) {
-        multicastToUser(user,
-            new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)));
+      for (Entry<String, Paragraph> userParagraphEntry : userParagraphMap.entrySet()) {
+        multicastToUser(userParagraphEntry.getKey(),
+            new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphEntry.getValue()));
       }
     }
   }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 238545c..6ef9cb2 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -19,8 +19,13 @@ package org.apache.zeppelin.socket;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tags;
+
 import java.io.IOException;
 import java.lang.reflect.Type;
+import java.net.SocketTimeoutException;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -40,6 +45,7 @@ import javax.inject.Provider;
 import javax.websocket.CloseReason;
 import javax.websocket.EndpointConfig;
 import javax.websocket.OnClose;
+import javax.websocket.OnError;
 import javax.websocket.OnMessage;
 import javax.websocket.OnOpen;
 import javax.websocket.Session;
@@ -91,6 +97,7 @@ import org.apache.zeppelin.types.InterpreterSettingsList;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.util.IdHashes;
 import org.apache.zeppelin.utils.CorsUtils;
+import org.apache.zeppelin.utils.ServerUtils;
 import org.apache.zeppelin.utils.TestUtils;
 import org.eclipse.jetty.util.annotation.ManagedAttribute;
 import org.eclipse.jetty.util.annotation.ManagedObject;
@@ -147,7 +154,7 @@ public class NotebookServer implements AngularObjectRegistryListener,
       ZeppelinConfiguration.ConfVars.ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS);
 
   // TODO(jl): This will be removed by handling session directly
-  private final Map<String, NotebookSocket> sessionIdNotebookSocketMap = new ConcurrentHashMap<>();
+  private final Map<String, NotebookSocket> sessionIdNotebookSocketMap = Metrics.gaugeMapSize("zeppelin_session_id_notebook_sockets", Tags.empty(), new ConcurrentHashMap<>());
   private ConnectionManager connectionManager;
   private ZeppelinConfiguration zeppelinConfiguration;
   private Provider<Notebook> notebookProvider;
@@ -245,7 +252,7 @@ public class NotebookServer implements AngularObjectRegistryListener,
   @OnOpen
   public void onOpen(Session session, EndpointConfig endpointConfig) throws IOException {
 
-    LOG.info("Session: {}, config: {}", session, endpointConfig.getUserProperties().keySet());
+    LOG.info("Open connection to {} with Session: {}, config: {}", ServerUtils.getRemoteAddress(session), session, endpointConfig.getUserProperties().keySet());
 
     Map<String, Object> headers = endpointConfig.getUserProperties();
     String origin = String.valueOf(headers.get(CorsUtils.HEADER_ORIGIN));
@@ -318,7 +325,6 @@ public class NotebookServer implements AngularObjectRegistryListener,
       if (StringUtils.isEmpty(conn.getUser())) {
         connectionManager.addUserConnection(receivedMessage.principal, conn);
       }
-
       ServiceContext context = getServiceContext(ticketEntry);
       // Lets be elegant here
       switch (receivedMessage.op) {
@@ -484,7 +490,7 @@ public class NotebookServer implements AngularObjectRegistryListener,
           break;
       }
     } catch (Exception e) {
-      LOG.error("Can't handle message: " + msg, e);
+      LOG.error("Can't handle message: {}", msg, e);
       try {
         conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", e.getMessage())));
       } catch (IOException iox) {
@@ -494,24 +500,39 @@ public class NotebookServer implements AngularObjectRegistryListener,
   }
 
   @OnClose
-  public void onClose(Session session, CloseReason closeReason) throws IOException {
-    NotebookSocket notebookSocket = sessionIdNotebookSocketMap.get(session.getId());
-    if (null == notebookSocket) {
-      session.close();
-    } else {
-      int code = closeReason.getCloseCode().getCode();
-      String reason = closeReason.getReasonPhrase();
-      onClose(notebookSocket, code, reason);
+  public void onClose(Session session, CloseReason closeReason) {
+    NotebookSocket notebookSocket = sessionIdNotebookSocketMap.remove(session.getId());
+    if (notebookSocket != null) {
+      LOG.info("Closed connection to {} ({}) {}", ServerUtils.getRemoteAddress(session), closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase());
+      removeConnection(notebookSocket);
     }
   }
 
-  public void onClose(NotebookSocket notebookSocket, int code, String reason) {
-    LOG.info("Closed connection to {} ({}) {}", notebookSocket, code, reason);
+  private void removeConnection(NotebookSocket notebookSocket) {
     connectionManager.removeConnection(notebookSocket);
     connectionManager.removeConnectionFromAllNote(notebookSocket);
     connectionManager.removeUserConnection(notebookSocket.getUser(), notebookSocket);
   }
 
+  @OnError
+  public void onError(Session session, Throwable error) {
+    if (session != null) {
+      NotebookSocket notebookSocket = sessionIdNotebookSocketMap.remove(session.getId());
+      if (notebookSocket != null) {
+        removeConnection(notebookSocket);
+      }
+    }
+    if (error instanceof SocketTimeoutException) {
+      LOG.warn("Socket Session to {} timed out", ServerUtils.getRemoteAddress(session));
+      LOG.debug("SocketTimeoutException", error);
+    } else if (error instanceof IOException) {
+      LOG.warn("Client {} is gone", ServerUtils.getRemoteAddress(session));
+      LOG.debug("IOException", error);
+    } else {
+      LOG.error("Error in WebSocket Session to {}", ServerUtils.getRemoteAddress(session), error);
+    }
+  }
+
   protected Message deserializeMessage(String msg) {
     return gson.fromJson(msg, Message.class);
   }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
index 609b4bb..58c41cd 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
@@ -17,6 +17,8 @@
 package org.apache.zeppelin.socket;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.utils.ServerUtils;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -54,6 +56,6 @@ public class NotebookSocket {
 
   @Override
   public String toString() {
-    return String.valueOf(session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"));
+    return ServerUtils.getRemoteAddress(session);
   }
 }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java
new file mode 100644
index 0000000..179a2a6
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.utils;
+
+import javax.websocket.Session;
+
+public class ServerUtils {
+  private ServerUtils() {
+    // do nothing
+  }
+
+  /**
+   *
+   * @param session
+   * @return the remote address of the websocket or "unknown" if session is null
+   */
+  public static String getRemoteAddress(Session session) {
+    if (session != null && session.getUserProperties() != null) {
+      return String.valueOf(session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"));
+    }
+    return "unknown";
+  }
+}