You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by bz...@apache.org on 2015/12/22 08:29:35 UTC
incubator-zeppelin git commit: ZEPPELIN-312: fix a bug with blocking
websocket broadcast
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master 16e921b6a -> 4c269e6d8
ZEPPELIN-312: fix a bug with blocking websocket broadcast
### What is this PR for?
Replacing synchronization through critical section over the collection of sockets with the lock-free collection implementation `java.util.concurrent.ConcurrentLinkedQueue`.
Synchronization was used to avoid parallel collection modifications, as the calls `.sendMessage()` in Jetty implementation of Websockets are thread-safe and can proceed concurrently.
### What type of PR is it?
Bug Fix
### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-312
### How should this be tested?
See JIRA
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Alexander Bezzubov <bz...@apache.org>
Closes #558 from bzz/fix/zeppelin-312-blocking-broadcast and squashes the following commits:
bbbf8ae [Alexander Bezzubov] ZEPPELIN-312: refactoring ZeppelinServer to better Java style naming conventions
497a6ca [Alexander Bezzubov] ZEPPELIN-312: replace sync \w lock-free collection
524c401 [Alexander Bezzubov] ZEPPELIN-312: refactoring ZeppelinServer to adhere Java naming conventions
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/4c269e6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/4c269e6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/4c269e6d
Branch: refs/heads/master
Commit: 4c269e6d860320e2612eb6b77785c6d1ff3ef106
Parents: 16e921b
Author: Alexander Bezzubov <bz...@apache.org>
Authored: Tue Dec 22 13:07:43 2015 +0900
Committer: Alexander Bezzubov <bz...@apache.org>
Committed: Tue Dec 22 16:29:21 2015 +0900
----------------------------------------------------------------------
.../apache/zeppelin/server/ZeppelinServer.java | 89 +++++++++-----------
.../apache/zeppelin/socket/NotebookServer.java | 32 +++----
.../zeppelin/rest/AbstractTestRestApi.java | 10 ++-
.../zeppelin/socket/NotebookServerTest.java | 2 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 6 +-
5 files changed, 64 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index e0e4a5d..7286b35 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -59,54 +59,65 @@ import org.slf4j.LoggerFactory;
* Main class of Zeppelin.
*
*/
-
public class ZeppelinServer extends Application {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
- private SchedulerFactory schedulerFactory;
public static Notebook notebook;
- public static NotebookServer notebookServer;
- public static Server jettyServer;
+ public static Server jettyWebServer;
+ public static NotebookServer notebookWsServer;
+ private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
private NotebookRepo notebookRepo;
- public static void main(String[] args) throws Exception {
+ public ZeppelinServer() throws Exception {
+ LOG.info("Constructor starteds");
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
- conf.setProperty("args", args);
- jettyServer = setupJettyServer(conf);
+ this.schedulerFactory = new SchedulerFactory();
+ this.replFactory = new InterpreterFactory(conf, notebookWsServer);
+ this.notebookRepo = new NotebookRepoSync(conf);
+
+ notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookWsServer);
+ LOG.info("Constructor finished");
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+ ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+ conf.setProperty("args", args);
// REST api
- final ServletContextHandler restApi = setupRestApiContextHandler(conf);
+ final ServletContextHandler restApiContext = setupRestApiContextHandler(conf);
// Notebook server
- final ServletContextHandler notebook = setupNotebookServer(conf);
+ final ServletContextHandler notebookContext = setupNotebookServer(conf);
// Web UI
final WebAppContext webApp = setupWebAppContext(conf);
// add all handlers
ContextHandlerCollection contexts = new ContextHandlerCollection();
- contexts.setHandlers(new Handler[]{restApi, notebook, webApp});
- jettyServer.setHandler(contexts);
+ contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp});
- LOG.info("Start zeppelin server");
+ jettyWebServer = setupJettyServer(conf);
+ jettyWebServer.setHandler(contexts);
+
+ LOG.info("Starting zeppelin server");
try {
- jettyServer.start();
+ jettyWebServer.start(); //Instantiates ZeppelinServer
} catch (Exception e) {
LOG.error("Error while running jettyServer", e);
System.exit(-1);
}
- LOG.info("Started zeppelin server");
+ LOG.info("Done, zeppelin server started");
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override public void run() {
LOG.info("Shutting down Zeppelin Server ... ");
try {
- jettyServer.stop();
- ZeppelinServer.notebook.getInterpreterFactory().close();
- ZeppelinServer.notebook.close();
+ jettyWebServer.stop();
+ notebook.getInterpreterFactory().close();
+ notebook.close();
} catch (Exception e) {
LOG.error("Error while stopping servlet container", e);
}
@@ -125,18 +136,15 @@ public class ZeppelinServer extends Application {
System.exit(0);
}
- jettyServer.join();
+ jettyWebServer.join();
ZeppelinServer.notebook.getInterpreterFactory().close();
}
- private static Server setupJettyServer(ZeppelinConfiguration conf)
- throws Exception {
-
+ private static Server setupJettyServer(ZeppelinConfiguration conf) {
AbstractConnector connector;
if (conf.useSsl()) {
connector = new SslSelectChannelConnector(getSslContextFactory(conf));
- }
- else {
+ } else {
connector = new SelectChannelConnector();
}
@@ -153,11 +161,9 @@ public class ZeppelinServer extends Application {
return server;
}
- private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf)
- throws Exception {
-
- notebookServer = new NotebookServer();
- final ServletHolder servletHolder = new ServletHolder(notebookServer);
+ private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) {
+ notebookWsServer = new NotebookServer();
+ final ServletHolder servletHolder = new ServletHolder(notebookWsServer);
servletHolder.setInitParameter("maxTextMessageSize", "1024000");
final ServletContextHandler cxfContext = new ServletContextHandler(
@@ -171,9 +177,8 @@ public class ZeppelinServer extends Application {
return cxfContext;
}
- private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
- throws Exception {
-
+ @SuppressWarnings("deprecation")
+ private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
// Note that the API for the SslContextFactory is different for
// Jetty version 9
SslContextFactory sslContextFactory = new SslContextFactory();
@@ -194,6 +199,7 @@ public class ZeppelinServer extends Application {
return sslContextFactory;
}
+ @SuppressWarnings("unused") //TODO(bzz) why unused?
private static SSLContext getSslContext(ZeppelinConfiguration conf)
throws Exception {
@@ -240,23 +246,10 @@ public class ZeppelinServer extends Application {
webApp.setTempDirectory(warTempDirectory);
}
// Explicit bind to root
- webApp.addServlet(
- new ServletHolder(new DefaultServlet()),
- "/*"
- );
+ webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*");
return webApp;
}
- public ZeppelinServer() throws Exception {
- ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-
- this.schedulerFactory = new SchedulerFactory();
-
- this.replFactory = new InterpreterFactory(conf, notebookServer);
- this.notebookRepo = new NotebookRepoSync(conf);
- notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer);
- }
-
@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<Class<?>>();
@@ -264,14 +257,14 @@ public class ZeppelinServer extends Application {
}
@Override
- public java.util.Set<java.lang.Object> getSingletons() {
- Set<Object> singletons = new HashSet<Object>();
+ public Set<Object> getSingletons() {
+ Set<Object> singletons = new HashSet<>();
/** Rest-api root endpoint */
ZeppelinRestApi root = new ZeppelinRestApi();
singletons.add(root);
- NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer);
+ NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer);
singletons.add(notebookApi);
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 114582f..a010e58 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -19,6 +19,8 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import javax.servlet.http.HttpServletRequest;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -44,12 +46,12 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.google.common.base.Strings;
import com.google.gson.Gson;
/**
* Zeppelin websocket service.
*
- * @author anthonycorbacho
*/
public class NotebookServer extends WebSocketServlet implements
NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener {
@@ -57,7 +59,7 @@ public class NotebookServer extends WebSocketServlet implements
.getLogger(NotebookServer.class);
Gson gson = new Gson();
final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
- final List<NotebookSocket> connectedSockets = new LinkedList<>();
+ final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
private Notebook notebook() {
return ZeppelinServer.notebook;
@@ -85,9 +87,7 @@ public class NotebookServer extends WebSocketServlet implements
public void onOpen(NotebookSocket conn) {
LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
conn.getRequest().getRemotePort());
- synchronized (connectedSockets) {
- connectedSockets.add(conn);
- }
+ connectedSockets.add(conn);
}
@Override
@@ -147,8 +147,7 @@ public class NotebookServer extends WebSocketServlet implements
completion(conn, notebook, messagereceived);
break;
case PING:
- pong();
- break;
+ break; //do nothing
case ANGULAR_OBJECT_UPDATED:
angularObjectUpdated(conn, notebook, messagereceived);
break;
@@ -166,9 +165,7 @@ public class NotebookServer extends WebSocketServlet implements
LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest()
.getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
removeConnectionFromAllNote(conn);
- synchronized (connectedSockets) {
- connectedSockets.remove(conn);
- }
+ connectedSockets.remove(conn);
}
protected Message deserializeMessage(String msg) {
@@ -285,13 +282,11 @@ public class NotebookServer extends WebSocketServlet implements
}
private void broadcastAll(Message m) {
- synchronized (connectedSockets) {
- for (NotebookSocket conn : connectedSockets) {
- try {
- conn.send(serializeMessage(m));
- } catch (IOException e) {
- LOG.error("socket error", e);
- }
+ for (NotebookSocket conn : connectedSockets) {
+ try {
+ conn.send(serializeMessage(m));
+ } catch (IOException e) {
+ LOG.error("socket error", e);
}
}
}
@@ -730,6 +725,7 @@ public class NotebookServer extends WebSocketServlet implements
public static class ParagraphJobListener implements JobListener {
private NotebookServer notebookServer;
private Note note;
+
public ParagraphJobListener(NotebookServer notebookServer, Note note) {
this.notebookServer = notebookServer;
this.note = note;
@@ -771,8 +767,6 @@ public class NotebookServer extends WebSocketServlet implements
public JobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener(this, note);
}
- private void pong() {
- }
private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
List<InterpreterSetting> settings = note.getNoteReplLoader()
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index db7affe..69d1022 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -29,8 +29,12 @@ import java.util.concurrent.Executors;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethodBase;
-import org.apache.commons.httpclient.methods.*;
-import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
@@ -207,7 +211,7 @@ public abstract class AbstractTestRestApi {
}
LOG.info("Terminating test Zeppelin...");
- ZeppelinServer.jettyServer.stop();
+ ZeppelinServer.jettyWebServer.stop();
executor.shutdown();
long s = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index faef287..67d12b7 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -60,7 +60,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
AbstractTestRestApi.startUp();
gson = new Gson();
notebook = ZeppelinServer.notebook;
- notebookServer = ZeppelinServer.notebookServer;
+ notebookServer = ZeppelinServer.notebookWsServer;
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 909345a..72b6a3c 100755
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -18,12 +18,12 @@
package org.apache.zeppelin.conf;
import java.net.URL;
-import java.util.*;
+import java.util.Arrays;
+import java.util.List;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
-import org.apache.zeppelin.notebook.repo.S3NotebookRepo;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
/**
* Zeppelin configuration.
*
- * @author Leemoonsoo
- *
*/
public class ZeppelinConfiguration extends XMLConfiguration {
private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";