You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/08/05 08:18:07 UTC
incubator-zeppelin git commit: ZEPPELIN-172 Websocket connection
without separate port
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master b9583c6e0 -> 3a42a28b0
ZEPPELIN-172 Websocket connection without separate port
This PR fixes https://issues.apache.org/jira/browse/ZEPPELIN-172
Author: Lee moon soo <mo...@apache.org>
Author: Sjoerd Mulder <sj...@frontendless.nl>
Author: Lee moon soo <le...@gmail.com>
Closes #170 from Leemoonsoo/websocket and squashes the following commits:
11a302a [Lee moon soo] Check text in more safe way
3cf839d [Lee moon soo] Merge pull request #2 from sjoerdmulder/websocket
7f8bc47 [Sjoerd Mulder] Cleanup of Javascript logic and Server code detecting the correct port
412927f [Lee moon soo] Handle large message
f56e417 [Lee moon soo] Add license header
806db9b [Lee moon soo] Remove websocket addr/port configuration
6180ed3 [Lee moon soo] Update README
85d14a0 [Lee moon soo] Create notebookserver instance manually
a7b82aa [Lee moon soo] Initial implementation of Websocket inside of Jetty server
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/3a42a28b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/3a42a28b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/3a42a28b
Branch: refs/heads/master
Commit: 3a42a28b01f9a3faf88b5a82b2901af8fc4a16a5
Parents: b9583c6
Author: Lee moon soo <mo...@apache.org>
Authored: Mon Aug 3 05:01:00 2015 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Aug 5 15:18:01 2015 +0900
----------------------------------------------------------------------
README.md | 2 +-
conf/zeppelin-site.xml.template | 17 +-
zeppelin-server/pom.xml | 22 +-
.../zeppelin/server/AppScriptServlet.java | 95 --------
.../apache/zeppelin/server/ZeppelinServer.java | 54 ++---
.../apache/zeppelin/socket/NotebookServer.java | 236 ++++++++++---------
.../apache/zeppelin/socket/NotebookSocket.java | 73 ++++++
.../zeppelin/socket/NotebookSocketListener.java | 26 ++
.../socket/SslWebSocketServerFactory.java | 76 ------
.../java/org/apache/zeppelin/ZeppelinIT.java | 46 ++--
.../zeppelin/rest/AbstractTestRestApi.java | 1 -
.../src/components/baseUrl/baseUrl.service.js | 46 +---
.../websocketEvents/websocketEvents.factory.js | 2 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 22 --
14 files changed, 303 insertions(+), 415 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 67d530e..d565858 100644
--- a/README.md
+++ b/README.md
@@ -116,7 +116,7 @@ Yarn
### Run
./bin/zeppelin-daemon.sh start
- browse localhost:8080 in your browser. 8081 port should be accessible for websocket connection.
+ browse localhost:8080 in your browser.
For configuration details check __./conf__ subdirectory.
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 8d0a7f1..13e4d1d 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -28,22 +28,7 @@
<property>
<name>zeppelin.server.port</name>
<value>8080</value>
- <description>Server port. port+1 is used for web socket.</description>
-</property>
-
-<property>
- <name>zeppelin.websocket.addr</name>
- <value>0.0.0.0</value>
- <description>Testing websocket address</description>
-</property>
-
-<!-- If the port value is negative, then it'll default to the server
- port + 1.
- -->
-<property>
- <name>zeppelin.websocket.port</name>
- <value>-1</value>
- <description>Testing websocket port</description>
+ <description>Server port.</description>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index e85a3ae..2b43e1b 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -95,12 +95,6 @@
<version>${cxf.version}</version>
</dependency>
- <dependency>
- <groupId>org.java-websocket</groupId>
- <artifactId>Java-WebSocket</artifactId>
- <version>1.3.0</version>
- </dependency>
-
<!-- Swagger -->
<dependency>
<groupId>com.wordnik</groupId>
@@ -297,19 +291,9 @@
</dependency>
<dependency>
- <groupId>org.atmosphere</groupId>
- <artifactId>atmosphere-jersey</artifactId>
- <version>2.2.0</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.ws.rs</groupId>
- <artifactId>javax.ws.rs-api</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-servlet</artifactId>
+ <version>1.13</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java
deleted file mode 100644
index 7a31461..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.server;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.eclipse.jetty.servlet.DefaultServlet;
-import org.eclipse.jetty.util.resource.Resource;
-
-/**
- * Simple servlet to dynamically set the Websocket port
- * in the JavaScript sent to the client
- */
-public class AppScriptServlet extends DefaultServlet {
-
- // Hash containing the possible scripts that contain the getPort()
- // function originally defined in app.js
- private static Set<String> scriptPaths = new HashSet<String>(
- Arrays.asList(
- "/scripts/scripts.js",
- "/components/baseUrl/baseUrl.js"
- )
- );
-
- private int websocketPort;
-
- public AppScriptServlet(int websocketPort) {
- this.websocketPort = websocketPort;
- }
-
- @Override
- protected void doGet(HttpServletRequest request, HttpServletResponse response)
- throws ServletException,
- IOException {
-
- // Process all requests not for the app script to the parent
- // class
- String uri = request.getRequestURI();
- if (!scriptPaths.contains(uri)) {
- super.doGet(request, response);
- return;
- }
-
- // Read the script file chunk by chunk
- Resource scriptFile = getResource(uri);
- InputStream is = scriptFile.getInputStream();
- StringBuffer script = new StringBuffer();
- byte[] buffer = new byte[1024];
- while (is.available() > 0) {
- int numRead = is.read(buffer);
- if (numRead <= 0) {
- break;
- }
- script.append(new String(buffer, 0, numRead, "UTF-8"));
- }
-
- // Replace the getPort function to return the proper value
- String startReplaceString = "/* @preserve AppScriptServlet - getPort */";
- String endReplaceString = "/* @preserve AppScriptServlet - close */";
-
- int startIndex = script.indexOf(startReplaceString);
- int endIndex = script.indexOf(endReplaceString, startIndex);
-
- if (startIndex >= 0 && endIndex >= 0) {
- String replaceString = "this.getPort=function(){return " + websocketPort + "};";
- script.replace(startIndex, endIndex + endReplaceString.length(), replaceString);
- }
-
- response.getWriter().println(script.toString());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 2bd23bb..ad1d907 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -26,6 +26,7 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.servlet.DispatcherType;
+import javax.servlet.Servlet;
import javax.ws.rs.core.Application;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
@@ -40,13 +41,14 @@ import org.apache.zeppelin.rest.NotebookRestApi;
import org.apache.zeppelin.rest.ZeppelinRestApi;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.socket.NotebookServer;
-import org.apache.zeppelin.socket.SslWebSocketServerFactory;
+import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.bio.SocketConnector;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.server.session.SessionHandler;
-import org.eclipse.jetty.server.ssl.SslSocketConnector;
+import org.eclipse.jetty.server.ssl.SslSelectChannelConnector;
+import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -83,8 +85,6 @@ public class ZeppelinServer extends Application {
conf.setProperty("args", args);
jettyServer = setupJettyServer(conf);
- notebookServer = setupNotebookServer(conf);
- notebookServer.start();
// REST api
final ServletContextHandler restApi = setupRestApiContextHandler();
@@ -93,17 +93,18 @@ public class ZeppelinServer extends Application {
*/
final ServletContextHandler swagger = setupSwaggerContextHandler(conf);
+ // Notebook server
+ final ServletContextHandler notebook = setupNotebookServer(conf);
+
// Web UI
- LOG.info("Create zeppelin websocket on {}:{}", notebookServer.getAddress()
- .getAddress(), notebookServer.getPort());
- final WebAppContext webApp = setupWebAppContext(conf, notebookServer.getPort());
+ final WebAppContext webApp = setupWebAppContext(conf);
//Below is commented since zeppelin-docs module is removed.
//final WebAppContext webAppSwagg = setupWebAppSwagger(conf);
// add all handlers
ContextHandlerCollection contexts = new ContextHandlerCollection();
//contexts.setHandlers(new Handler[]{swagger, restApi, webApp, webAppSwagg});
- contexts.setHandlers(new Handler[]{swagger, restApi, webApp});
+ contexts.setHandlers(new Handler[]{swagger, restApi, notebook, webApp});
jettyServer.setHandler(contexts);
LOG.info("Start zeppelin server");
@@ -114,10 +115,7 @@ public class ZeppelinServer extends Application {
@Override public void run() {
LOG.info("Shutting down Zeppelin Server ... ");
try {
- notebook.getInterpreterFactory().close();
-
jettyServer.stop();
- notebookServer.stop();
} catch (Exception e) {
LOG.error("Error while stopping servlet container", e);
}
@@ -142,12 +140,12 @@ public class ZeppelinServer extends Application {
private static Server setupJettyServer(ZeppelinConfiguration conf)
throws Exception {
- SocketConnector connector;
+ AbstractConnector connector;
if (conf.useSsl()) {
- connector = new SslSocketConnector(getSslContextFactory(conf));
+ connector = new SslSelectChannelConnector(getSslContextFactory(conf));
}
else {
- connector = new SocketConnector();
+ connector = new SelectChannelConnector();
}
// Set some timeout options to make debugging easier.
@@ -163,20 +161,22 @@ public class ZeppelinServer extends Application {
return server;
}
- private static NotebookServer setupNotebookServer(ZeppelinConfiguration conf)
+ private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf)
throws Exception {
- NotebookServer server = new NotebookServer(conf.getWebSocketAddress(), conf.getWebSocketPort());
+ notebookServer = new NotebookServer();
+ final ServletHolder servletHolder = new ServletHolder(notebookServer);
+ servletHolder.setInitParameter("maxTextMessageSize", "1024000");
- // Default WebSocketServer uses unencrypted connector, so only need to
- // change the connector if SSL should be used.
- if (conf.useSsl()) {
- SslWebSocketServerFactory wsf = new SslWebSocketServerFactory(getSslContext(conf));
- wsf.setNeedClientAuth(conf.useClientAuth());
- server.setWebSocketFactory(wsf);
- }
+ final ServletContextHandler cxfContext = new ServletContextHandler(
+ ServletContextHandler.SESSIONS);
- return server;
+ cxfContext.setSessionHandler(new SessionHandler());
+ cxfContext.setContextPath("/");
+ cxfContext.addServlet(servletHolder, "/ws/*");
+ cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*",
+ EnumSet.allOf(DispatcherType.class));
+ return cxfContext;
}
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
@@ -257,7 +257,7 @@ public class ZeppelinServer extends Application {
}
private static WebAppContext setupWebAppContext(
- ZeppelinConfiguration conf, int websocketPort) {
+ ZeppelinConfiguration conf) {
WebAppContext webApp = new WebAppContext();
File warPath = new File(conf.getString(ConfVars.ZEPPELIN_WAR));
@@ -273,7 +273,7 @@ public class ZeppelinServer extends Application {
}
// Explicit bind to root
webApp.addServlet(
- new ServletHolder(new AppScriptServlet(websocketPort)),
+ new ServletHolder(new DefaultServlet()),
"/*"
);
return webApp;
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 90a2a95..ed35ea1 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.servlet.http.HttpServletRequest;
+
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
@@ -39,9 +41,8 @@ import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.socket.Message.OP;
-import org.java_websocket.WebSocket;
-import org.java_websocket.handshake.ClientHandshake;
-import org.java_websocket.server.WebSocketServer;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.eclipse.jetty.websocket.WebSocketServlet;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,40 +55,36 @@ import com.google.gson.Gson;
*
* @author anthonycorbacho
*/
-public class NotebookServer extends WebSocketServer implements
- JobListenerFactory, AngularObjectRegistryListener {
+public class NotebookServer extends WebSocketServlet implements
+ NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener {
- private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
- private static final String DEFAULT_ADDR = "0.0.0.0";
- private static final int DEFAULT_PORT = 8282;
+ private static final Logger LOG = LoggerFactory
+ .getLogger(NotebookServer.class);
Gson gson = new Gson();
- Map<String, List<WebSocket>> noteSocketMap = new HashMap<String, List<WebSocket>>();
- List<WebSocket> connectedSockets = new LinkedList<WebSocket>();
-
- public NotebookServer() {
- super(new InetSocketAddress(DEFAULT_ADDR, DEFAULT_PORT));
- }
-
- public NotebookServer(String address, int port) {
- super(new InetSocketAddress(address, port));
- }
+ Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<String, List<NotebookSocket>>();
+ List<NotebookSocket> connectedSockets = new LinkedList<NotebookSocket>();
private Notebook notebook() {
return ZeppelinServer.notebook;
}
@Override
- public void onOpen(WebSocket conn, ClientHandshake handshake) {
- LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn
- .getRemoteSocketAddress().getPort());
+ public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) {
+ return new NotebookSocket(req, protocol, this);
+ }
+
+ @Override
+ public void onOpen(NotebookSocket conn) {
+ LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
+ conn.getRequest().getRemotePort());
synchronized (connectedSockets) {
connectedSockets.add(conn);
}
}
@Override
- public void onMessage(WebSocket conn, String msg) {
+ public void onMessage(NotebookSocket conn, String msg) {
Notebook notebook = notebook();
try {
Message messagereceived = deserializeMessage(msg);
@@ -132,7 +129,7 @@ public class NotebookServer extends WebSocketServer implements
break;
case PING:
pong();
- break;
+ break;
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, notebook, messagereceived);
break;
@@ -146,17 +143,9 @@ public class NotebookServer extends WebSocketServer implements
}
@Override
- public void onClose(WebSocket conn, int code, String reason, boolean remote) {
- LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn
- .getRemoteSocketAddress().getPort());
- removeConnectionFromAllNote(conn);
- synchronized (connectedSockets) {
- connectedSockets.remove(conn);
- }
- }
-
- @Override
- public void onError(WebSocket conn, Exception message) {
+ public void onClose(NotebookSocket conn, int code, String reason) {
+ LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest()
+ .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
removeConnectionFromAllNote(conn);
synchronized (connectedSockets) {
connectedSockets.remove(conn);
@@ -172,12 +161,13 @@ public class NotebookServer extends WebSocketServer implements
return gson.toJson(m);
}
- private void addConnectionToNote(String noteId, WebSocket socket) {
+ private void addConnectionToNote(String noteId, NotebookSocket socket) {
synchronized (noteSocketMap) {
- removeConnectionFromAllNote(socket); // make sure a socket relates only a single note.
- List<WebSocket> socketList = noteSocketMap.get(noteId);
+ removeConnectionFromAllNote(socket); // make sure a socket relates only a
+ // single note.
+ List<NotebookSocket> socketList = noteSocketMap.get(noteId);
if (socketList == null) {
- socketList = new LinkedList<WebSocket>();
+ socketList = new LinkedList<NotebookSocket>();
noteSocketMap.put(noteId, socketList);
}
@@ -187,9 +177,9 @@ public class NotebookServer extends WebSocketServer implements
}
}
- private void removeConnectionFromNote(String noteId, WebSocket socket) {
+ private void removeConnectionFromNote(String noteId, NotebookSocket socket) {
synchronized (noteSocketMap) {
- List<WebSocket> socketList = noteSocketMap.get(noteId);
+ List<NotebookSocket> socketList = noteSocketMap.get(noteId);
if (socketList != null) {
socketList.remove(socket);
}
@@ -198,11 +188,11 @@ public class NotebookServer extends WebSocketServer implements
private void removeNote(String noteId) {
synchronized (noteSocketMap) {
- List<WebSocket> socketList = noteSocketMap.remove(noteId);
+ List<NotebookSocket> socketList = noteSocketMap.remove(noteId);
}
}
- private void removeConnectionFromAllNote(WebSocket socket) {
+ private void removeConnectionFromAllNote(NotebookSocket socket) {
synchronized (noteSocketMap) {
Set<String> keys = noteSocketMap.keySet();
for (String noteId : keys) {
@@ -211,12 +201,12 @@ public class NotebookServer extends WebSocketServer implements
}
}
- private String getOpenNoteId(WebSocket socket) {
+ private String getOpenNoteId(NotebookSocket socket) {
String id = null;
synchronized (noteSocketMap) {
Set<String> keys = noteSocketMap.keySet();
for (String noteId : keys) {
- List<WebSocket> sockets = noteSocketMap.get(noteId);
+ List<NotebookSocket> sockets = noteSocketMap.get(noteId);
if (sockets.contains(socket)) {
id = noteId;
}
@@ -225,7 +215,8 @@ public class NotebookServer extends WebSocketServer implements
return id;
}
- private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) {
+ private void broadcastToNoteBindedInterpreter(String interpreterGroupId,
+ Message m) {
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
@@ -240,23 +231,31 @@ public class NotebookServer extends WebSocketServer implements
private void broadcast(String noteId, Message m) {
synchronized (noteSocketMap) {
- List<WebSocket> socketLists = noteSocketMap.get(noteId);
+ List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
}
LOG.info("SEND >> " + m.op);
- for (WebSocket conn : socketLists) {
- conn.send(serializeMessage(m));
+ for (NotebookSocket conn : socketLists) {
+ try {
+ conn.send(serializeMessage(m));
+ } catch (IOException e) {
+ LOG.error("socket error", e);
+ }
}
}
}
private void broadcastAll(Message m) {
synchronized (connectedSockets) {
- for (WebSocket conn : connectedSockets) {
- conn.send(serializeMessage(m));
+ for (NotebookSocket conn : connectedSockets) {
+ try {
+ conn.send(serializeMessage(m));
+ } catch (IOException e) {
+ LOG.error("socket error", e);
+ }
}
}
}
@@ -278,7 +277,8 @@ public class NotebookServer extends WebSocketServer implements
broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
}
- private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) {
+ private void sendNote(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
String noteId = (String) fromMessage.get("id");
if (noteId == null) {
return;
@@ -296,7 +296,8 @@ public class NotebookServer extends WebSocketServer implements
throws SchedulerException, IOException {
String noteId = (String) fromMessage.get("id");
String name = (String) fromMessage.get("name");
- Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+ Map<String, Object> config = (Map<String, Object>) fromMessage
+ .get("config");
if (noteId == null) {
return;
}
@@ -319,7 +320,8 @@ public class NotebookServer extends WebSocketServer implements
}
}
- private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) {
+ private boolean isCronUpdated(Map<String, Object> configA,
+ Map<String, Object> configB) {
boolean cronUpdated = false;
if (configA.get("cron") != null && configB.get("cron") != null
&& configA.get("cron").equals(configB.get("cron"))) {
@@ -352,14 +354,16 @@ public class NotebookServer extends WebSocketServer implements
broadcastNoteList();
}
- private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
+ private void updateParagraph(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
}
- Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
- Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+ Map<String, Object> params = (Map<String, Object>) fromMessage
+ .get("params");
+ Map<String, Object> config = (Map<String, Object>) fromMessage
+ .get("config");
final Note note = notebook.getNote(getOpenNoteId(conn));
Paragraph p = note.getParagraph(paragraphId);
p.settings.setParams(params);
@@ -370,8 +374,8 @@ public class NotebookServer extends WebSocketServer implements
broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p));
}
- private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
+ private void removeParagraph(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
@@ -385,7 +389,8 @@ public class NotebookServer extends WebSocketServer implements
}
}
- private void completion(WebSocket conn, Notebook notebook, Message fromMessage) {
+ private void completion(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
String paragraphId = (String) fromMessage.get("id");
String buffer = (String) fromMessage.get("buf");
int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
@@ -404,6 +409,7 @@ public class NotebookServer extends WebSocketServer implements
/**
* When angular object updated from client
+ *
* @param conn
* @param notebook
* @param fromMessage
@@ -417,12 +423,12 @@ public class NotebookServer extends WebSocketServer implements
AngularObject ao = null;
boolean global = false;
-
-
+
// propagate change to (Remote) AngularObjectRegistry
Note note = notebook.getNote(noteId);
if (note != null) {
- List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
+ List<InterpreterSetting> settings = note.getNoteReplLoader()
+ .getInterpreterSettings();
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup() == null) {
continue;
@@ -433,7 +439,7 @@ public class NotebookServer extends WebSocketServer implements
.getInterpreterGroup().getAngularObjectRegistry();
// first trying to get local registry
- ao = angularObjectRegistry.get(varName, noteId);
+ ao = angularObjectRegistry.get(varName, noteId);
if (ao == null) {
// then try global registry
ao = angularObjectRegistry.get(varName, null);
@@ -454,26 +460,29 @@ public class NotebookServer extends WebSocketServer implements
}
}
}
-
- if (global) { // broadcast change to all web session that uses related interpreter.
+
+ if (global) { // broadcast change to all web session that uses related
+ // interpreter.
for (Note n : notebook.getAllNotes()) {
- List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
+ List<InterpreterSetting> settings = note.getNoteReplLoader()
+ .getInterpreterSettings();
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup() == null) {
continue;
}
-
+
if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
AngularObjectRegistry angularObjectRegistry = setting
.getInterpreterGroup().getAngularObjectRegistry();
- this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
- .put("angularObject", ao)
- .put("interpreterGroupId", interpreterGroupId)
- .put("noteId", n.id()));
+ this.broadcast(
+ n.id(),
+ new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
+ .put("interpreterGroupId", interpreterGroupId)
+ .put("noteId", n.id()));
}
}
}
- } else { // broadcast to all web session for the note
+ } else { // broadcast to all web session for the note
this.broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao)
@@ -482,24 +491,25 @@ public class NotebookServer extends WebSocketServer implements
}
}
-
- private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
+ private void moveParagraph(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
}
- final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
+ final int newIndex = (int) Double.parseDouble(fromMessage.get("index")
+ .toString());
final Note note = notebook.getNote(getOpenNoteId(conn));
note.moveParagraph(paragraphId, newIndex);
note.persist();
broadcastNote(note);
}
- private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
+ private void insertParagraph(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
+ final int index = (int) Double.parseDouble(fromMessage.get("index")
+ .toString());
final Note note = notebook.getNote(getOpenNoteId(conn));
note.insertParagraph(index);
@@ -507,9 +517,8 @@ public class NotebookServer extends WebSocketServer implements
broadcastNote(note);
}
-
- private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
+ private void cancelParagraph(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
@@ -520,8 +529,8 @@ public class NotebookServer extends WebSocketServer implements
p.abort();
}
- private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
+ private void runParagraph(NotebookSocket conn, Notebook notebook,
+ Message fromMessage) throws IOException {
final String paragraphId = (String) fromMessage.get("id");
if (paragraphId == null) {
return;
@@ -531,13 +540,16 @@ public class NotebookServer extends WebSocketServer implements
String text = (String) fromMessage.get("paragraph");
p.setText(text);
p.setTitle((String) fromMessage.get("title"));
- Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
+ Map<String, Object> params = (Map<String, Object>) fromMessage
+ .get("params");
p.settings.setParams(params);
- Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+ Map<String, Object> config = (Map<String, Object>) fromMessage
+ .get("config");
p.setConfig(config);
// if it's the last paragraph, let's add a new one
- boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId());
+ boolean isTheLastParagraph = note.getLastParagraph().getId()
+ .equals(p.getId());
if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) {
note.addParagraph();
}
@@ -546,12 +558,12 @@ public class NotebookServer extends WebSocketServer implements
try {
note.run(paragraphId);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
LOG.error("Exception from run", ex);
if (p != null) {
- p.setReturn(new InterpreterResult(
- InterpreterResult.Code.ERROR, ex.getMessage()), ex);
+ p.setReturn(
+ new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()),
+ ex);
p.setStatus(Status.ERROR);
}
}
@@ -572,12 +584,15 @@ public class NotebookServer extends WebSocketServer implements
@Override
public void onProgressUpdate(Job job, int progress) {
- notebookServer.broadcast(note.id(),
- new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress()));
+ notebookServer.broadcast(
+ note.id(),
+ new Message(OP.PROGRESS).put("id", job.getId()).put("progress",
+ job.progress()));
}
@Override
- public void beforeStatusChange(Job job, Status before, Status after) {}
+ public void beforeStatusChange(Job job, Status before, Status after) {
+ }
@Override
public void afterStatusChange(Job job, Status before, Status after) {
@@ -606,19 +621,22 @@ public class NotebookServer extends WebSocketServer implements
private void pong() {
}
- private void sendAllAngularObjects(Note note, WebSocket conn) {
- List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
+ private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
+ List<InterpreterSetting> settings = note.getNoteReplLoader()
+ .getInterpreterSettings();
if (settings == null || settings.size() == 0) {
return;
}
for (InterpreterSetting intpSetting : settings) {
- AngularObjectRegistry registry = intpSetting.getInterpreterGroup().getAngularObjectRegistry();
+ AngularObjectRegistry registry = intpSetting.getInterpreterGroup()
+ .getAngularObjectRegistry();
List<AngularObject> objects = registry.getAllWithGlobal(note.id());
for (AngularObject object : objects) {
conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
- .put("angularObject", object)
- .put("interpreterGroupId", intpSetting.getInterpreterGroup().getId())
+ .put("angularObject", object)
+ .put("interpreterGroupId",
+ intpSetting.getInterpreterGroup().getId())
.put("noteId", note.id())));
}
}
@@ -641,23 +659,25 @@ public class NotebookServer extends WebSocketServer implements
if (object.getNoteId() != null && !note.id().equals(object.getNoteId())) {
continue;
}
-
+
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
.getInterpreterSettings();
- if (intpSettings.isEmpty()) continue;
+ if (intpSettings.isEmpty())
+ continue;
for (InterpreterSetting setting : intpSettings) {
if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
- broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
- .put("angularObject", object)
- .put("interpreterGroupId", interpreterGroupId)
- .put("noteId", note.id()));
+ broadcast(
+ note.id(),
+ new Message(OP.ANGULAR_OBJECT_UPDATE)
+ .put("angularObject", object)
+ .put("interpreterGroupId", interpreterGroupId)
+ .put("noteId", note.id()));
}
}
- }
+ }
}
-
@Override
public void onRemove(String interpreterGroupId, String name, String noteId) {
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
new file mode 100644
index 0000000..aceea45
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.socket;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+
+import org.eclipse.jetty.websocket.WebSocket;
+
+/**
+ * Notebook websocket
+ */
+public class NotebookSocket implements WebSocket.OnTextMessage{
+
+ private Connection connection;
+ private NotebookSocketListener listener;
+ private HttpServletRequest request;
+ private String protocol;
+
+
+ public NotebookSocket(HttpServletRequest req, String protocol,
+ NotebookSocketListener listener) {
+ this.listener = listener;
+ this.request = req;
+ this.protocol = protocol;
+ }
+
+ @Override
+ public void onClose(int closeCode, String message) {
+ listener.onClose(this, closeCode, message);
+ }
+
+ @Override
+ public void onOpen(Connection connection) {
+ this.connection = connection;
+ listener.onOpen(this);
+ }
+
+ @Override
+ public void onMessage(String message) {
+ listener.onMessage(this, message);
+ }
+
+
+ public HttpServletRequest getRequest() {
+ return request;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void send(String serializeMessage) throws IOException {
+ connection.sendMessage(serializeMessage);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java
new file mode 100644
index 0000000..77fed6e
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.socket;
+
+/**
+ * NoteboookSocket listener
+ */
+public interface NotebookSocketListener {
+ public void onClose(NotebookSocket socket, int code, String message);
+ public void onOpen(NotebookSocket socket);
+ public void onMessage(NotebookSocket socket, String message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java
deleted file mode 100644
index f44dc1f..0000000
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.socket;
-
-import java.io.IOException;
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.ExecutorService;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import org.java_websocket.SSLSocketChannel2;
-import org.java_websocket.server.DefaultSSLWebSocketServerFactory;
-
-/**
- * Extension of the java_websocket library's DefaultSslWebSocketServerFactory
- * to require client side authentication during the SSL handshake
- */
-public class SslWebSocketServerFactory
- extends DefaultSSLWebSocketServerFactory {
-
- protected boolean needClientAuth;
-
- public SslWebSocketServerFactory(SSLContext sslcontext) {
- super(sslcontext);
- initAttributes();
- }
-
- public SslWebSocketServerFactory(
- SSLContext sslcontext,
- ExecutorService exec) {
-
- super(sslcontext, exec);
- initAttributes();
- }
-
- protected void initAttributes() {
- this.needClientAuth = false;
- }
-
- @Override
- public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key)
- throws IOException {
-
- SSLEngine sslEngine = sslcontext.createSSLEngine();
- sslEngine.setUseClientMode(false);
- sslEngine.setNeedClientAuth(needClientAuth);
- return new SSLSocketChannel2( channel, sslEngine, exec, key );
- }
-
- public boolean getNeedClientAuth() {
- return needClientAuth;
- }
-
- public void setNeedClientAuth(boolean needClientAuth) {
- this.needClientAuth = needClientAuth;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java
index 779396c..b170a95 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java
@@ -150,6 +150,20 @@ public class ZeppelinIT {
return null != System.getenv("CI");
}
+ boolean waitForText(final String txt, final By by) {
+ try {
+ new WebDriverWait(driver, 5).until(new ExpectedCondition<Boolean>() {
+ @Override
+ public Boolean apply(WebDriver d) {
+ return txt.equals(driver.findElement(by).getText());
+ }
+ });
+ return true;
+ } catch (TimeoutException e) {
+ return false;
+ }
+ }
+
@Test
public void testAngularDisplay() throws InterruptedException{
if (!endToEndTestEnabled()) {
@@ -176,8 +190,8 @@ public class ZeppelinIT {
waitForParagraph(1, "FINISHED");
// check expected text
- assertEquals("BindingTest__", driver.findElement(By.xpath(
- getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText());
+ waitForText("BindingTest__", By.xpath(
+ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
* Bind variable
@@ -190,8 +204,8 @@ public class ZeppelinIT {
waitForParagraph(2, "FINISHED");
// check expected text
- assertEquals("BindingTest_1_", driver.findElement(By.xpath(
- getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText());
+ waitForText("BindingTest_1_", By.xpath(
+ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
@@ -206,8 +220,8 @@ public class ZeppelinIT {
waitForParagraph(3, "FINISHED");
// check expected text
- assertEquals("myVar=1", driver.findElement(By.xpath(
- getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText());
+ waitForText("myVar=1", By.xpath(
+ getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]"));
/*
* Click element
@@ -216,8 +230,8 @@ public class ZeppelinIT {
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click();
// check expected text
- assertEquals("BindingTest_2_", driver.findElement(By.xpath(
- getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText());
+ waitForText("BindingTest_2_", By.xpath(
+ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
* Register watcher
@@ -242,13 +256,13 @@ public class ZeppelinIT {
getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click();
// check expected text
- assertEquals("BindingTest_3_", driver.findElement(By.xpath(
- getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText());
+ waitForText("BindingTest_3_", By.xpath(
+ getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
waitForParagraph(3, "FINISHED");
// check expected text by watcher
- assertEquals("myVar=3", driver.findElement(By.xpath(
- getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText());
+ waitForText("myVar=3", By.xpath(
+ getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]"));
/*
* Unbind
@@ -261,8 +275,8 @@ public class ZeppelinIT {
waitForParagraph(5, "FINISHED");
// check expected text
- assertEquals("BindingTest__", driver.findElement(By.xpath(
- getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText());
+ waitForText("BindingTest__",
+ By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
/*
* Bind again and see rebind works.
@@ -272,8 +286,8 @@ public class ZeppelinIT {
waitForParagraph(2, "FINISHED");
// check expected text
- assertEquals("BindingTest_1_", driver.findElement(By.xpath(
- getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText());
+ waitForText("BindingTest_1_",
+ By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]"));
System.out.println("testCreateNotebook Test executed");
}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 744c1e0..393dc7b 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -187,7 +187,6 @@ public abstract class AbstractTestRestApi {
protected static void shutDown() throws Exception {
if (!wasRunning) {
LOG.info("Terminating test Zeppelin...");
- ZeppelinServer.notebookServer.stop();
ZeppelinServer.jettyServer.stop();
executor.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-web/src/components/baseUrl/baseUrl.service.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/components/baseUrl/baseUrl.service.js b/zeppelin-web/src/components/baseUrl/baseUrl.service.js
index 662d88f..f5eb2df 100644
--- a/zeppelin-web/src/components/baseUrl/baseUrl.service.js
+++ b/zeppelin-web/src/components/baseUrl/baseUrl.service.js
@@ -15,52 +15,32 @@
angular.module('zeppelinWebApp').service('baseUrlSrv', function() {
- /** Get the current port of the websocket
- *
- * When running Zeppelin, the body of this function will be dynamically
- * overridden with the AppScriptServlet from zeppelin-site.xml config value.
- *
- * If the config value is not defined, it defaults to the HTTP port + 1
- *
- * In the case of running "grunt serve", this function will appear
- * as is.
- */
-
- /* @preserve AppScriptServlet - getPort */
this.getPort = function() {
var port = Number(location.port);
- if (location.protocol !== 'https:' && !port) {
- port = 80;
- } else if (location.protocol === 'https:' && !port) {
- port = 443;
- } else if (port === 3333 || port === 9000) {
- port = 8080;
- }
- return port + 1;
- };
- /* @preserve AppScriptServlet - close */
-
- this.getWebsocketProtocol = function() {
- return location.protocol === 'https:' ? 'wss' : 'ws';
- };
-
- this.getRestApiBase = function() {
- var port = Number(location.port);
if (!port) {
port = 80;
if (location.protocol === 'https:') {
port = 443;
}
}
-
+ //Exception for when running locally via grunt
if (port === 3333 || port === 9000) {
port = 8080;
}
- return location.protocol + '//' + location.hostname + ':' + port + skipTrailingSlash(location.pathname) + '/api';
+ return port;
+ };
+
+ this.getWebsocketUrl = function() {
+ var wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
+ return wsProtocol + '//' + location.hostname + ':' + this.getPort() + '/ws';
};
-
+
+ this.getRestApiBase = function() {
+ return location.protocol + '//' + location.hostname + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/api';
+ };
+
var skipTrailingSlash = function(path) {
return path.replace(/\/$/, '');
};
-});
\ No newline at end of file
+});
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
index 731266f..6d9f177 100644
--- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
+++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js
@@ -16,7 +16,7 @@
angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $websocket, baseUrlSrv) {
var websocketCalls = {};
- websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketProtocol() + '://' + location.hostname + ':' + baseUrlSrv.getPort());
+ websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl());
websocketCalls.ws.onOpen(function() {
console.log('Websocket created');
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 086c15e..223dc70 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -268,25 +268,6 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getInt(ConfVars.ZEPPELIN_PORT);
}
- public String getWebSocketAddress() {
- return getString(ConfVars.ZEPPELIN_WEBSOCKET_ADDR);
- }
-
- public int getWebSocketPort() {
- int port = getInt(ConfVars.ZEPPELIN_WEBSOCKET_PORT);
- int serverPort = getServerPort();
-
- if (port < 0) {
- if (serverPort <= 0) {
- return 0;
- } else {
- return serverPort + 1;
- }
- } else {
- return port;
- }
- }
-
public String getKeyStorePath() {
return getRelativeDir(
String.format("%s/%s",
@@ -389,9 +370,6 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_HOME("zeppelin.home", "../"),
ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"),
ZEPPELIN_PORT("zeppelin.server.port", 8080),
- // negative websocket port denotes that server port + 1 should be used
- ZEPPELIN_WEBSOCKET_ADDR("zeppelin.websocket.addr", "0.0.0.0"),
- ZEPPELIN_WEBSOCKET_PORT("zeppelin.websocket.port", -1),
ZEPPELIN_SSL("zeppelin.ssl", false),
ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false),
ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"),