You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2022/03/28 07:22:06 UTC
[zeppelin] branch master updated: [ZEPPELIN-5569] Implement onError
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 4cee906 [ZEPPELIN-5569] Implement onError
4cee906 is described below
commit 4cee9068c72805693bcb630ed13fea50da24506e
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Mon Mar 21 10:27:57 2022 +0100
[ZEPPELIN-5569] Implement onError
### What is this PR for?
This PR implement `OnError`, which is necessary to cleanup the connectionManager, if possible.
### What type of PR is it?
- BugFix
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5569
### How should this be tested?
* I have done some manual testing, but it is difficult to test this case.
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #4260 from Reamer/onError and squashes the following commits:
46d1c88f2 [Philipp Dallig] catch RuntimeException
15f22d55f [Philipp Dallig] Add Metrics
f9e537912 [Philipp Dallig] Implement onError
8d591d7ed [Philipp Dallig] remove NotebookSocket from sessionIdNotebookSocketMap
---
.../apache/zeppelin/socket/ConnectionManager.java | 41 +++++++++---------
.../org/apache/zeppelin/socket/NotebookServer.java | 49 +++++++++++++++-------
.../org/apache/zeppelin/socket/NotebookSocket.java | 4 +-
.../org/apache/zeppelin/utils/ServerUtils.java | 37 ++++++++++++++++
4 files changed, 97 insertions(+), 34 deletions(-)
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
index 51c8fdf..a3a4089 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -20,6 +20,10 @@ package org.apache.zeppelin.socket;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tags;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.GUI;
@@ -46,6 +50,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -62,21 +67,21 @@ public class ConnectionManager {
.setPrettyPrinting()
.registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
- final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
+ final Queue<NotebookSocket> connectedSockets = Metrics.gaugeCollectionSize("zeppelin_connected_sockets", Tags.empty(), new ConcurrentLinkedQueue<>());
// noteId -> connection
- final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
+ final Map<String, List<NotebookSocket>> noteSocketMap = Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>());
// user -> connection
- final Map<String, Queue<NotebookSocket>> userSocketMap = new HashMap<>();
+ final Map<String, Queue<NotebookSocket>> userSocketMap = Metrics.gaugeMapSize("zeppelin_user_sockets", Tags.empty(), new HashMap<>());
/**
- * This is a special endpoint in the notebook websoket, Every connection in this Queue
+ * This is a special endpoint in the notebook websocket, Every connection in this Queue
* will be able to watch every websocket event, it doesnt need to be listed into the map of
* noteSocketMap. This can be used to get information about websocket traffic and watch what
* is going on.
*/
final Queue<NotebookSocket> watcherSockets = new ConcurrentLinkedQueue<>();
- private final HashSet<String> collaborativeModeList = new HashSet<>();
+ private final HashSet<String> collaborativeModeList = Metrics.gaugeCollectionSize("zeppelin_collaborative_modes", Tags.empty(),new HashSet<>());
private final Boolean collaborativeModeEnable = ZeppelinConfiguration
.create()
.isZeppelinNotebookCollaborativeModeEnable();
@@ -151,11 +156,9 @@ public class ConnectionManager {
public String getAssociatedNoteId(NotebookSocket socket) {
String associatedNoteId = null;
synchronized (noteSocketMap) {
- Set<String> noteIds = noteSocketMap.keySet();
- for (String noteId : noteIds) {
- List<NotebookSocket> sockets = noteSocketMap.get(noteId);
- if (sockets.contains(socket)) {
- associatedNoteId = noteId;
+ for (Entry<String, List<NotebookSocket>> noteSocketMapEntry : noteSocketMap.entrySet()) {
+ if (noteSocketMapEntry.getValue().contains(socket)) {
+ associatedNoteId = noteSocketMapEntry.getKey();
}
}
}
@@ -205,7 +208,7 @@ public class ConnectionManager {
for (NotebookSocket ns : connectedSockets) {
try {
ns.send(serializeMessage(m));
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
LOGGER.error("Send error: {}", m, e);
}
}
@@ -226,7 +229,7 @@ public class ConnectionManager {
for (NotebookSocket conn : socketsToBroadcast) {
try {
conn.send(serializeMessage(m));
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
LOGGER.error("socket error", e);
}
}
@@ -242,7 +245,7 @@ public class ConnectionManager {
.message(serializeMessage(message))
.build()
.toJson());
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
LOGGER.error("Cannot broadcast message to watcher", e);
}
}
@@ -267,7 +270,7 @@ public class ConnectionManager {
}
try {
conn.send(serializeMessage(m));
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
LOGGER.error("socket error", e);
}
}
@@ -289,7 +292,7 @@ public class ConnectionManager {
try {
conn.send(serializedMsg);
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
LOGGER.error("Cannot broadcast message to conn", e);
}
}
@@ -319,7 +322,7 @@ public class ConnectionManager {
public void unicast(Message m, NotebookSocket conn) {
try {
conn.send(serializeMessage(m));
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
LOGGER.error("socket error", e);
}
broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
@@ -385,9 +388,9 @@ public class ConnectionManager {
public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap) {
if (null != userParagraphMap) {
- for (String user : userParagraphMap.keySet()) {
- multicastToUser(user,
- new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)));
+ for (Entry<String, Paragraph> userParagraphEntry : userParagraphMap.entrySet()) {
+ multicastToUser(userParagraphEntry.getKey(),
+ new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphEntry.getValue()));
}
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 238545c..6ef9cb2 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -19,8 +19,13 @@ package org.apache.zeppelin.socket;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tags;
+
import java.io.IOException;
import java.lang.reflect.Type;
+import java.net.SocketTimeoutException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -40,6 +45,7 @@ import javax.inject.Provider;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
+import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
@@ -91,6 +97,7 @@ import org.apache.zeppelin.types.InterpreterSettingsList;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.util.IdHashes;
import org.apache.zeppelin.utils.CorsUtils;
+import org.apache.zeppelin.utils.ServerUtils;
import org.apache.zeppelin.utils.TestUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@@ -147,7 +154,7 @@ public class NotebookServer implements AngularObjectRegistryListener,
ZeppelinConfiguration.ConfVars.ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS);
// TODO(jl): This will be removed by handling session directly
- private final Map<String, NotebookSocket> sessionIdNotebookSocketMap = new ConcurrentHashMap<>();
+ private final Map<String, NotebookSocket> sessionIdNotebookSocketMap = Metrics.gaugeMapSize("zeppelin_session_id_notebook_sockets", Tags.empty(), new ConcurrentHashMap<>());
private ConnectionManager connectionManager;
private ZeppelinConfiguration zeppelinConfiguration;
private Provider<Notebook> notebookProvider;
@@ -245,7 +252,7 @@ public class NotebookServer implements AngularObjectRegistryListener,
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig) throws IOException {
- LOG.info("Session: {}, config: {}", session, endpointConfig.getUserProperties().keySet());
+ LOG.info("Open connection to {} with Session: {}, config: {}", ServerUtils.getRemoteAddress(session), session, endpointConfig.getUserProperties().keySet());
Map<String, Object> headers = endpointConfig.getUserProperties();
String origin = String.valueOf(headers.get(CorsUtils.HEADER_ORIGIN));
@@ -318,7 +325,6 @@ public class NotebookServer implements AngularObjectRegistryListener,
if (StringUtils.isEmpty(conn.getUser())) {
connectionManager.addUserConnection(receivedMessage.principal, conn);
}
-
ServiceContext context = getServiceContext(ticketEntry);
// Lets be elegant here
switch (receivedMessage.op) {
@@ -484,7 +490,7 @@ public class NotebookServer implements AngularObjectRegistryListener,
break;
}
} catch (Exception e) {
- LOG.error("Can't handle message: " + msg, e);
+ LOG.error("Can't handle message: {}", msg, e);
try {
conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", e.getMessage())));
} catch (IOException iox) {
@@ -494,24 +500,39 @@ public class NotebookServer implements AngularObjectRegistryListener,
}
@OnClose
- public void onClose(Session session, CloseReason closeReason) throws IOException {
- NotebookSocket notebookSocket = sessionIdNotebookSocketMap.get(session.getId());
- if (null == notebookSocket) {
- session.close();
- } else {
- int code = closeReason.getCloseCode().getCode();
- String reason = closeReason.getReasonPhrase();
- onClose(notebookSocket, code, reason);
+ public void onClose(Session session, CloseReason closeReason) {
+ NotebookSocket notebookSocket = sessionIdNotebookSocketMap.remove(session.getId());
+ if (notebookSocket != null) {
+ LOG.info("Closed connection to {} ({}) {}", ServerUtils.getRemoteAddress(session), closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase());
+ removeConnection(notebookSocket);
}
}
- public void onClose(NotebookSocket notebookSocket, int code, String reason) {
- LOG.info("Closed connection to {} ({}) {}", notebookSocket, code, reason);
+ private void removeConnection(NotebookSocket notebookSocket) {
connectionManager.removeConnection(notebookSocket);
connectionManager.removeConnectionFromAllNote(notebookSocket);
connectionManager.removeUserConnection(notebookSocket.getUser(), notebookSocket);
}
+ @OnError
+ public void onError(Session session, Throwable error) {
+ if (session != null) {
+ NotebookSocket notebookSocket = sessionIdNotebookSocketMap.remove(session.getId());
+ if (notebookSocket != null) {
+ removeConnection(notebookSocket);
+ }
+ }
+ if (error instanceof SocketTimeoutException) {
+ LOG.warn("Socket Session to {} timed out", ServerUtils.getRemoteAddress(session));
+ LOG.debug("SocketTimeoutException", error);
+ } else if (error instanceof IOException) {
+ LOG.warn("Client {} is gone", ServerUtils.getRemoteAddress(session));
+ LOG.debug("IOException", error);
+ } else {
+ LOG.error("Error in WebSocket Session to {}", ServerUtils.getRemoteAddress(session), error);
+ }
+ }
+
protected Message deserializeMessage(String msg) {
return gson.fromJson(msg, Message.class);
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
index 609b4bb..58c41cd 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
@@ -17,6 +17,8 @@
package org.apache.zeppelin.socket;
import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.utils.ServerUtils;
+
import java.io.IOException;
import java.util.Map;
@@ -54,6 +56,6 @@ public class NotebookSocket {
@Override
public String toString() {
- return String.valueOf(session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"));
+ return ServerUtils.getRemoteAddress(session);
}
}
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java
new file mode 100644
index 0000000..179a2a6
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.utils;
+
+import javax.websocket.Session;
+
+public class ServerUtils {
+ private ServerUtils() {
+ // do nothing
+ }
+
+ /**
+ *
+ * @param session
+ * @return the remote address of the websocket or "unknown" if session is null
+ */
+ public static String getRemoteAddress(Session session) {
+ if (session != null && session.getUserProperties() != null) {
+ return String.valueOf(session.getUserProperties().get("javax.websocket.endpoint.remoteAddress"));
+ }
+ return "unknown";
+ }
+}