You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/11/16 21:30:49 UTC
zeppelin git commit: [ZEPPELIN-1610] - Add notebook watcher
Repository: zeppelin
Updated Branches:
refs/heads/master e0930570d -> 5e85e6ea6
[ZEPPELIN-1610] - Add notebook watcher
### What is this PR for?
Add a Simple way to switch a websocket connection to a new state; watcher.
A websocket watcher is a special connection that will watch most of the web socket even in Zeppelin, this cam be used to monitor zeppelin server activity.
### What type of PR is it?
[Feature]
### Todos
* [x] - Add watcher Queue
* [x] - Add endpoint to switch from regular client to watcher
* [x] - Add a way to generate a uniq key when zeppelin server restart
* [x] - Add example on how to use watcher.
### What is the Jira issue?
* [ZEPPELIN-1610](https://issues.apache.org/jira/browse/ZEPPELIN-1610)
### How should this be tested?
You will have to create your own websocket client and provide a valid http header (`X-Watcher-Key`) when you connect to zeppelin ws
something like
```
private Session openWatcherSession() {
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setHeader(WatcherSecurityKey.HTTP_HEADER, WatcherSecurityKey.getKey());
WatcherWebsocket socket = WatcherWebsocket.createInstace();
Future<Session> future = null;
Session session = null;
try {
future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
session = future.get();
} catch (IOException | InterruptedException | ExecutionException e) {
LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
return session;
}
return session;
}
```
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
Author: Anthony Corbacho <co...@gmail.com>
Closes #1588 from anthonycorbacho/feat/updateWebsocketInZeppelinHubRepo and squashes the following commits:
26cf53f [Anthony Corbacho] Move broadcastToWatcher
8413e9b [Anthony Corbacho] Remove redundant broadcast
fb2c260 [Anthony Corbacho] Remove TODO in socket/Message
716c92c [Anthony Corbacho] Fix checkstyle
a10ba13 [Anthony Corbacho] Add remove connection from note back in test
89d70f2 [Anthony Corbacho] fix test
092791e [Anthony Corbacho] Light refactoring :: add missing header, add comment, refacto some methods
8f7e1b3 [Anthony Corbacho] Add X-Watcher-Key in request header for watcher client
e2d3053 [Anthony Corbacho] Add simple check for ws before switching ws client to watcher, client should provide a header X-Watcher-Key with a valid key (generated at runtime), if key invalid client wont be accepted
e25ea1e [Anthony Corbacho] Add simple Key generation for Watcher ws client
4affe25 [Anthony Corbacho] Handle remoing wssession from notebook map once the session is close :: avoiding socket connection to be ide
c32192a [Anthony Corbacho] rework watcher creation and ws session with notes
3bd3482 [Anthony Corbacho] Reorder import :: Google check style
bde5db4 [Anthony Corbacho] Update ping routine
ede1f18 [Anthony Corbacho] make private field public for accessibility
aa55a5a [Anthony Corbacho] Strting to rework ZeppelinClient
e5b3a1d [Anthony Corbacho] Add zeppelinhub notebook watcher
9d6c93f [Anthony Corbacho] Add new OP watcher
0d7f493 [Anthony Corbacho] Added new WS queue called watcher, watcher will be abler to listen to almost every note action performed in zeppelin notebook websocket server
45849ce [Anthony Corbacho] Add new message type :: Watcher message, this class will wrapp zeppelin ws message and add extra information such as noteId and user
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5e85e6ea
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5e85e6ea
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5e85e6ea
Branch: refs/heads/master
Commit: 5e85e6ea6ff004735fde9bef58085b83369f864b
Parents: e093057
Author: Anthony Corbacho <co...@gmail.com>
Authored: Mon Nov 14 14:56:38 2016 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Nov 16 13:30:42 2016 -0800
----------------------------------------------------------------------
.../apache/zeppelin/socket/NotebookServer.java | 85 ++++++++--
.../java/org/apache/zeppelin/notebook/Note.java | 10 +-
.../zeppelinhub/websocket/ZeppelinClient.java | 156 +++++++++++--------
.../websocket/listener/WatcherWebsocket.java | 81 ++++++++++
.../websocket/listener/ZeppelinWebsocket.java | 5 +-
.../websocket/scheduler/ZeppelinHeartbeat.java | 4 +-
.../zeppelin/notebook/socket/Message.java | 5 +-
.../notebook/socket/WatcherMessage.java | 73 +++++++++
.../zeppelin/util/WatcherSecurityKey.java | 35 +++++
.../websocket/ZeppelinClientTest.java | 10 +-
10 files changed, 378 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index bc0d7f5..c434ffe 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -48,16 +48,24 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
-import org.apache.zeppelin.notebook.*;
+import org.apache.zeppelin.notebook.JobListenerFactory;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.NotebookAuthorization;
+import org.apache.zeppelin.notebook.NotebookEventListener;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.ParagraphJobListener;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.notebook.socket.WatcherMessage;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.ticket.TicketContainer;
import org.apache.zeppelin.types.InterpreterSettingsList;
import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.util.WatcherSecurityKey;
import org.apache.zeppelin.utils.InterpreterBindingUtils;
import org.apache.zeppelin.utils.SecurityUtils;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
@@ -67,6 +75,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -97,6 +106,14 @@ public class NotebookServer extends WebSocketServlet implements
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
+ /**
+ * This is a special endpoint in the notebook websoket, Every connection in this Queue
+ * will be able to watch every websocket event, it doesnt need to be listed into the map of
+ * noteSocketMap. This can be used to get information about websocket traffic and watch what
+ * is going on.
+ */
+ final Queue<NotebookSocket> watcherSockets = Queues.newConcurrentLinkedQueue();
+
private Notebook notebook() {
return ZeppelinServer.notebook;
}
@@ -275,6 +292,9 @@ public class NotebookServer extends WebSocketServlet implements
case GET_INTERPRETER_SETTINGS:
getInterpreterSettings(conn, subject);
break;
+ case WATCHER:
+ switchConnectionToWatcher(conn, messagereceived);
+ break;
default:
break;
}
@@ -389,6 +409,7 @@ public class NotebookServer extends WebSocketServlet implements
private void broadcast(String noteId, Message m) {
synchronized (noteSocketMap) {
+ broadcastToWatchers(noteId, StringUtils.EMPTY, m);
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
@@ -406,6 +427,7 @@ public class NotebookServer extends WebSocketServlet implements
private void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
synchronized (noteSocketMap) {
+ broadcastToWatchers(noteId, StringUtils.EMPTY, m);
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
@@ -431,11 +453,7 @@ public class NotebookServer extends WebSocketServlet implements
}
for (NotebookSocket conn: userConnectedSockets.get(user)) {
- try {
- conn.send(serializeMessage(m));
- } catch (IOException e) {
- LOG.error("socket error", e);
- }
+ unicast(m, conn);
}
}
@@ -445,6 +463,7 @@ public class NotebookServer extends WebSocketServlet implements
} catch (IOException e) {
LOG.error("socket error", e);
}
+ broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m);
}
public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException {
@@ -545,10 +564,8 @@ public class NotebookServer extends WebSocketServlet implements
broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
}
- public void broadcastInterpreterBindings(String noteId,
- List settingList) {
- broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS)
- .put("interpreterBindings", settingList));
+ public void broadcastInterpreterBindings(String noteId, List settingList) {
+ broadcast(noteId, new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList));
}
public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) {
@@ -1770,6 +1787,50 @@ public class NotebookServer extends WebSocketServlet implements
.get(settingId);
interpreterSetting.setInfos(metaInfos);
}
-
+
+ private void switchConnectionToWatcher(NotebookSocket conn, Message messagereceived)
+ throws IOException {
+ if (!isSessionAllowedToSwitchToWatcher(conn)) {
+ LOG.error("Cannot switch this client to watcher, invalid security key");
+ return;
+ }
+ LOG.info("Going to add {} to watcher socket", conn);
+ // add the connection to the watcher.
+ if (watcherSockets.contains(conn)) {
+ LOG.info("connection alrerady present in the watcher");
+ return;
+ }
+ watcherSockets.add(conn);
+
+ // remove this connection from regular zeppelin ws usage.
+ removeConnectionFromAllNote(conn);
+ connectedSockets.remove(conn);
+ removeUserConnection(conn.getUser(), conn);
+ }
+
+ private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) {
+ String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER);
+ return !(StringUtils.isBlank(watcherSecurityKey)
+ || !watcherSecurityKey.equals(WatcherSecurityKey.getKey()));
+ }
+
+ private void broadcastToWatchers(String noteId, String subject, Message message) {
+ synchronized (watcherSockets) {
+ if (watcherSockets.isEmpty()) {
+ return;
+ }
+ for (NotebookSocket watcher : watcherSockets) {
+ try {
+ watcher.send(WatcherMessage
+ .builder(noteId)
+ .subject(subject)
+ .message(serializeMessage(message))
+ .build()
+ .serialize());
+ } catch (IOException e) {
+ LOG.error("Cannot broadcast message to watcher", e);
+ }
+ }
+ }
+ }
}
-
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 66362bd..aa08adf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -30,12 +30,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -57,6 +52,11 @@ import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.search.SearchService;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
/**
* Binded interpreters for a note
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
index c40b504..e05a746 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -27,13 +28,14 @@ import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.WatcherWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener.ZeppelinWebsocket;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.protocol.ZeppelinhubMessage;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.SchedulerService;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHeartbeat;
-import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler.ZeppelinHubHeartbeat;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.util.WatcherSecurityKey;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
@@ -54,11 +56,14 @@ public class ZeppelinClient {
private final String zeppelinhubToken;
private final WebSocketClient wsClient;
private static Gson gson;
- private ConcurrentHashMap<String, Session> zeppelinConnectionMap;
+ // Keep track of current open connection per notebook.
+ private ConcurrentHashMap<String, Session> notesConnection;
+ // Listen to every note actions.
+ private static Session watcherSession;
private static ZeppelinClient instance = null;
private SchedulerService schedulerService;
private Authentication authModule;
- private static final int min = 60;
+ private static final int MIN = 60;
public static ZeppelinClient initialize(String zeppelinUrl, String token,
ZeppelinConfiguration conf) {
@@ -77,7 +82,7 @@ public class ZeppelinClient {
zeppelinhubToken = token;
wsClient = createNewWebsocketClient();
gson = new Gson();
- zeppelinConnectionMap = new ConcurrentHashMap<>();
+ notesConnection = new ConcurrentHashMap<>();
schedulerService = SchedulerService.getInstance();
authModule = Authentication.initialize(token, conf);
if (authModule != null) {
@@ -89,7 +94,7 @@ public class ZeppelinClient {
private WebSocketClient createNewWebsocketClient() {
SslContextFactory sslContextFactory = new SslContextFactory();
WebSocketClient client = new WebSocketClient(sslContextFactory);
- client.setMaxIdleTimeout(5 * min * 1000);
+ client.setMaxIdleTimeout(5 * MIN * 1000);
client.setMaxTextMessageBufferSize(Client.getMaxNoteSize());
client.getPolicy().setMaxTextMessageSize(Client.getMaxNoteSize());
//TODO(khalid): other client settings
@@ -110,17 +115,26 @@ public class ZeppelinClient {
}
private void addRoutines() {
- schedulerService.add(ZeppelinHeartbeat.newInstance(this), 15, 4 * min);
+ schedulerService.add(ZeppelinHeartbeat.newInstance(this), 10, 1 * MIN);
+ new Timer().schedule(new java.util.TimerTask() {
+ @Override
+ public void run() {
+ watcherSession = openWatcherSession();
+ }
+ }, 5000);
}
public void stop() {
try {
if (wsClient != null) {
- removeAllZeppelinConnections();
+ removeAllConnections();
wsClient.stop();
} else {
LOG.warn("Cannot stop zeppelin websocket client - isn't initialized");
}
+ if (watcherSession != null) {
+ watcherSession.close();
+ }
} catch (Exception e) {
LOG.error("Cannot stop Zeppelin websocket client", e);
}
@@ -153,6 +167,22 @@ public class ZeppelinClient {
}
return msg;
}
+
+ private Session openWatcherSession() {
+ ClientUpgradeRequest request = new ClientUpgradeRequest();
+ request.setHeader(WatcherSecurityKey.HTTP_HEADER, WatcherSecurityKey.getKey());
+ WatcherWebsocket socket = WatcherWebsocket.createInstace();
+ Future<Session> future = null;
+ Session session = null;
+ try {
+ future = wsClient.connect(socket, zeppelinWebsocketUrl, request);
+ session = future.get();
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ LOG.error("Couldn't establish websocket connection to Zeppelin ", e);
+ return session;
+ }
+ return session;
+ }
public void send(Message msg, String noteId) {
Session noteSession = getZeppelinConnection(noteId);
@@ -162,27 +192,16 @@ public class ZeppelinClient {
}
noteSession.getRemote().sendStringByFuture(serialize(msg));
}
-
- private boolean isSessionOpen(Session session) {
- return (session != null) && (session.isOpen());
- }
- /* per notebook based ws connection, returns null if can't connect */
public Session getZeppelinConnection(String noteId) {
if (StringUtils.isBlank(noteId)) {
- LOG.warn("Cannot return websocket connection for blank noteId");
+ LOG.warn("Cannot get Websocket session with blanck noteId");
return null;
}
-
- if (zeppelinConnectionMap.containsKey(noteId)) {
- LOG.info("Connection for {} exists in map", noteId);
- return getNoteSession(noteId);
- }
- //TODO(khalid): clean log later
- LOG.info("Creating Zeppelin websocket connection {} {}", zeppelinWebsocketUrl, noteId);
- return openNoteSession(noteId);
+ return getNoteSession(noteId);
}
-
+
+/*
private Message zeppelinGetNoteMsg(String noteId) {
Message getNoteMsg = new Message(Message.OP.GET_NOTE);
HashMap<String, Object> data = new HashMap<>();
@@ -190,12 +209,14 @@ public class ZeppelinClient {
getNoteMsg.data = data;
return getNoteMsg;
}
-
+ */
+
private Session getNoteSession(String noteId) {
- Session session = zeppelinConnectionMap.get(noteId);
- if (session == null || !session.isOpen()) {
- LOG.info("Not connection to {}", noteId);
- zeppelinConnectionMap.remove(noteId);
+ LOG.info("Getting Note websocket connection for note {}", noteId);
+ Session session = notesConnection.get(noteId);
+ if (!isSessionOpen(session)) {
+ LOG.info("No open connection for note {}, opening one", noteId);
+ notesConnection.remove(noteId);
session = openNoteSession(noteId);
}
return session;
@@ -214,17 +235,28 @@ public class ZeppelinClient {
return session;
}
- if (zeppelinConnectionMap.containsKey(noteId)) {
+ if (notesConnection.containsKey(noteId)) {
session.close();
- session = zeppelinConnectionMap.get(noteId);
+ session = notesConnection.get(noteId);
} else {
String getNote = serialize(zeppelinGetNoteMsg(noteId));
- // TODO(khalid): may need to check return whether successful
session.getRemote().sendStringByFuture(getNote);
- zeppelinConnectionMap.put(noteId, session);
+ notesConnection.put(noteId, session);
}
return session;
}
+
+ private boolean isSessionOpen(Session session) {
+ return (session != null) && (session.isOpen());
+ }
+
+ private Message zeppelinGetNoteMsg(String noteId) {
+ Message getNoteMsg = new Message(Message.OP.GET_NOTE);
+ HashMap<String, Object> data = new HashMap<String, Object>();
+ data.put("id", noteId);
+ getNoteMsg.data = data;
+ return getNoteMsg;
+ }
public void handleMsgFromZeppelin(String message, String noteId) {
Map<String, String> meta = new HashMap<>();
@@ -243,46 +275,48 @@ public class ZeppelinClient {
client.relayToZeppelinHub(hubMsg.serialize());
}
- /**
- * Close and remove ZeppelinConnection
- */
- public void removeZeppelinConnection(String noteId) {
- if (zeppelinConnectionMap.containsKey(noteId)) {
- Session conn = zeppelinConnectionMap.get(noteId);
- if (conn.isOpen()) {
- conn.close();
+ public void removeNoteConnection(String noteId) {
+ if (StringUtils.isBlank(noteId)) {
+ LOG.error("Cannot remove session for empty noteId");
+ return;
+ }
+ if (notesConnection.containsKey(noteId)) {
+ Session connection = notesConnection.get(noteId);
+ if (connection.isOpen()) {
+ connection.close();
}
- zeppelinConnectionMap.remove(noteId);
+ notesConnection.remove(noteId);
}
- // TODO(khalid): clean log later
- LOG.info("Removed Zeppelin ws connection for the following note {}", noteId);
+ LOG.info("Removed note websocket connection for note {}", noteId);
}
+
+ private void removeAllConnections() {
+ if (watcherSession != null && watcherSession.isOpen()) {
+ watcherSession.close();
+ }
- /**
- * Close and remove all ZeppelinConnection
- */
- public void removeAllZeppelinConnections() {
- for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
- if (isSessionOpen(entry.getValue())) {
- entry.getValue().close();
+ Session noteSession = null;
+ for (Map.Entry<String, Session> note: notesConnection.entrySet()) {
+ noteSession = note.getValue();
+ if (isSessionOpen(noteSession)) {
+ noteSession.close();
}
- zeppelinConnectionMap.remove(entry.getKey());
}
- LOG.info("Removed all Zeppelin ws connections");
+ notesConnection.clear();
}
- public void pingAllNotes() {
- for (Map.Entry<String, Session> entry: zeppelinConnectionMap.entrySet()) {
- if (isSessionOpen(entry.getValue())) {
- send(new Message(OP.PING), entry.getKey());
- } else {
- // for cleanup
- zeppelinConnectionMap.remove(entry.getKey());
- }
+ public void ping() {
+ if (watcherSession == null) {
+ LOG.info("Cannot send PING event, no watcher found");
+ return;
}
+ watcherSession.getRemote().sendStringByFuture(serialize(new Message(OP.PING)));
}
-
+
+ /**
+ * Only used in test.
+ */
public int countConnectedNotes() {
- return zeppelinConnectionMap.size();
+ return notesConnection.size();
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
new file mode 100644
index 0000000..5ccacb9
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/WatcherWebsocket.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.listener;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.ZeppelinClient;
+import org.apache.zeppelin.notebook.socket.Message;
+import org.apache.zeppelin.notebook.socket.Message.OP;
+import org.apache.zeppelin.notebook.socket.WatcherMessage;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ * Zeppelin Watcher that will forward user note to ZeppelinHub.
+ *
+ */
+public class WatcherWebsocket implements WebSocketListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinWebsocket.class);
+ private static final Gson GSON = new Gson();
+ public Session connection;
+
+ public static WatcherWebsocket createInstace() {
+ return new WatcherWebsocket();
+ }
+
+ @Override
+ public void onWebSocketBinary(byte[] payload, int offset, int len) {
+ }
+
+ @Override
+ public void onWebSocketClose(int code, String reason) {
+ LOG.info("WatcherWebsocket connection closed with code: {}, message: {}", code, reason);
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ LOG.info("WatcherWebsocket connection opened");
+ this.connection = session;
+ session.getRemote().sendStringByFuture(GSON.toJson(new Message(OP.WATCHER)));
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ LOG.warn("WatcherWebsocket socket connection error ", cause);
+ }
+
+ @Override
+ public void onWebSocketText(String message) {
+ WatcherMessage watcherMsg = GSON.fromJson(message, WatcherMessage.class);
+ if (StringUtils.isBlank(watcherMsg.noteId)) {
+ return;
+ }
+ try {
+ ZeppelinClient zeppelinClient = ZeppelinClient.getInstance();
+ if (zeppelinClient != null) {
+ zeppelinClient.handleMsgFromZeppelin(watcherMsg.message, watcherMsg.noteId);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to send message to ZeppelinHub: ", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
index ac102b5..fa6ade8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/listener/ZeppelinWebsocket.java
@@ -43,7 +43,7 @@ public class ZeppelinWebsocket implements WebSocketListener {
@Override
public void onWebSocketClose(int code, String message) {
LOG.info("Zeppelin connection closed with code: {}, message: {}", code, message);
- // parentClient.removeConnMap(noteId);
+ ZeppelinClient.getInstance().removeNoteConnection(noteId);
}
@Override
@@ -54,7 +54,8 @@ public class ZeppelinWebsocket implements WebSocketListener {
@Override
public void onWebSocketError(Throwable e) {
- LOG.warn("Zeppelin socket connection error: {}", e.toString());
+ LOG.warn("Zeppelin socket connection error ", e);
+ ZeppelinClient.getInstance().removeNoteConnection(noteId);
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
index 8a85c84..11cfa45 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/scheduler/ZeppelinHeartbeat.java
@@ -38,7 +38,7 @@ public class ZeppelinHeartbeat implements Runnable {
@Override
public void run() {
- LOG.debug("Sending PING to all connected Zeppelin notes");
- client.pingAllNotes();
+ LOG.debug("Sending PING to Zeppelin Websocket Server");
+ client.ping();
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
index 9fe9636..b6a305c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -145,9 +145,12 @@ public class Message {
INTERPRETER_BINDINGS, // [s-c] interpreter bindings
GET_INTERPRETER_SETTINGS, // [c-s] get interpreter settings
INTERPRETER_SETTINGS, // [s-c] interpreter settings
- ERROR_INFO // [s-c] error information to be sent
+ ERROR_INFO, // [s-c] error information to be sent
+ WATCHER, // [s-c] Change websocket to watcher mode.
}
+ public static final Message EMPTY = new Message(null);
+
public OP op;
public Map<String, Object> data = new HashMap<>();
public String ticket = "anonymous";
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java
new file mode 100644
index 0000000..0fb28cc
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/WatcherMessage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook.socket;
+
+import com.google.gson.Gson;
+
+/**
+ * Zeppelin websocket massage template class for watcher socket.
+ */
+public class WatcherMessage {
+
+ public String message;
+ public String noteId;
+ public String subject;
+
+ private static final Gson gson = new Gson();
+
+ public static Builder builder(String noteId) {
+ return new Builder(noteId);
+ }
+
+ private WatcherMessage(Builder builder) {
+ this.noteId = builder.noteId;
+ this.message = builder.message;
+ this.subject = builder.subject;
+ }
+
+ public String serialize() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Simple builder.
+ */
+ public static class Builder {
+ private final String noteId;
+ private String subject;
+ private String message;
+
+ public Builder(String noteId) {
+ this.noteId = noteId;
+ }
+
+ public Builder subject(String subject) {
+ this.subject = subject;
+ return this;
+ }
+
+ public Builder message(String message) {
+ this.message = message;
+ return this;
+ }
+
+ public WatcherMessage build() {
+ return new WatcherMessage(this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java
new file mode 100644
index 0000000..f0c3ad2
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/util/WatcherSecurityKey.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.util;
+
+import java.util.UUID;
+
+/**
+ * Simple implementation of a auto-generated key for websocket watcher.
+ * This is a LAZY implementation, we might want to update this later on :)
+ */
+public class WatcherSecurityKey {
+ public static final String HTTP_HEADER = "X-Watcher-Key";
+ private static final String KEY = UUID.randomUUID().toString();
+
+ protected WatcherSecurityKey() {}
+
+ public static String getKey() {
+ return KEY;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5e85e6ea/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
index b2cc81c..746e775 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClientTest.java
@@ -1,6 +1,10 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -74,7 +78,7 @@ public class ZeppelinClientTest {
assertEquals(connectionB, client.getZeppelinConnection("BBBB"));
// Remove connection to note AAAA
- client.removeZeppelinConnection("AAAA");
+ client.removeNoteConnection("AAAA");
assertEquals(client.countConnectedNotes(), 1);
assertNotEquals(connectionA, client.getZeppelinConnection("AAAA"));
assertEquals(client.countConnectedNotes(), 2);
@@ -117,7 +121,7 @@ public class ZeppelinClientTest {
msg.data = Maps.newHashMap();
msg.data.put("key", "value");
client.send(msg, "DDDD");
- client.removeZeppelinConnection("DDDD");
+ client.removeNoteConnection("DDDD");
client.stop();
}
}