You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by bz...@apache.org on 2015/12/22 08:29:35 UTC

incubator-zeppelin git commit: ZEPPELIN-312: fix a bug with blocking websocket broadcast

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 16e921b6a -> 4c269e6d8


ZEPPELIN-312: fix a bug with blocking websocket broadcast

### What is this PR for?
Replacing synchronization through critical section over the collection of sockets with the lock-free collection implementation  `java.util.concurrent.ConcurrentLinkedQueue`.
Synchronization was used to avoid parallel collection modifications, as the calls `.sendMessage()` in Jetty implementation of Websockets are thread-safe and can proceed concurrently.

### What type of PR is it?
Bug Fix

### Is there a relevant Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-312

### How should this be tested?
See JIRA

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Alexander Bezzubov <bz...@apache.org>

Closes #558 from bzz/fix/zeppelin-312-blocking-broadcast and squashes the following commits:

bbbf8ae [Alexander Bezzubov] ZEPPELIN-312: refactoring ZeppelinServer to better Java style naming conventions
497a6ca [Alexander Bezzubov] ZEPPELIN-312: replace sync \w lock-free collection
524c401 [Alexander Bezzubov] ZEPPELIN-312: refactoring ZeppelinServer to adhere Java naming conventions


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/4c269e6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/4c269e6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/4c269e6d

Branch: refs/heads/master
Commit: 4c269e6d860320e2612eb6b77785c6d1ff3ef106
Parents: 16e921b
Author: Alexander Bezzubov <bz...@apache.org>
Authored: Tue Dec 22 13:07:43 2015 +0900
Committer: Alexander Bezzubov <bz...@apache.org>
Committed: Tue Dec 22 16:29:21 2015 +0900

----------------------------------------------------------------------
 .../apache/zeppelin/server/ZeppelinServer.java  | 89 +++++++++-----------
 .../apache/zeppelin/socket/NotebookServer.java  | 32 +++----
 .../zeppelin/rest/AbstractTestRestApi.java      | 10 ++-
 .../zeppelin/socket/NotebookServerTest.java     |  2 +-
 .../zeppelin/conf/ZeppelinConfiguration.java    |  6 +-
 5 files changed, 64 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index e0e4a5d..7286b35 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -59,54 +59,65 @@ import org.slf4j.LoggerFactory;
  * Main class of Zeppelin.
  *
  */
-
 public class ZeppelinServer extends Application {
   private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
 
-  private SchedulerFactory schedulerFactory;
   public static Notebook notebook;
-  public static NotebookServer notebookServer;
-  public static Server jettyServer;
+  public static Server jettyWebServer;
+  public static NotebookServer notebookWsServer;
 
+  private SchedulerFactory schedulerFactory;
   private InterpreterFactory replFactory;
   private NotebookRepo notebookRepo;
 
-  public static void main(String[] args) throws Exception {
+  public ZeppelinServer() throws Exception {
+    LOG.info("Constructor starteds");
     ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-    conf.setProperty("args", args);
 
-    jettyServer = setupJettyServer(conf);
+    this.schedulerFactory = new SchedulerFactory();
+    this.replFactory = new InterpreterFactory(conf, notebookWsServer);
+    this.notebookRepo = new NotebookRepoSync(conf);
+
+    notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookWsServer);
+    LOG.info("Constructor finished");
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+    conf.setProperty("args", args);
 
     // REST api
-    final ServletContextHandler restApi = setupRestApiContextHandler(conf);
+    final ServletContextHandler restApiContext = setupRestApiContextHandler(conf);
 
     // Notebook server
-    final ServletContextHandler notebook = setupNotebookServer(conf);
+    final ServletContextHandler notebookContext = setupNotebookServer(conf);
 
     // Web UI
     final WebAppContext webApp = setupWebAppContext(conf);
 
     // add all handlers
     ContextHandlerCollection contexts = new ContextHandlerCollection();
-    contexts.setHandlers(new Handler[]{restApi, notebook, webApp});
-    jettyServer.setHandler(contexts);
+    contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp});
 
-    LOG.info("Start zeppelin server");
+    jettyWebServer = setupJettyServer(conf);
+    jettyWebServer.setHandler(contexts);
+
+    LOG.info("Starting zeppelin server");
     try {
-      jettyServer.start();
+      jettyWebServer.start(); //Instantiates ZeppelinServer
     } catch (Exception e) {
       LOG.error("Error while running jettyServer", e);
       System.exit(-1);
     }
-    LOG.info("Started zeppelin server");
+    LOG.info("Done, zeppelin server started");
 
     Runtime.getRuntime().addShutdownHook(new Thread(){
       @Override public void run() {
         LOG.info("Shutting down Zeppelin Server ... ");
         try {
-          jettyServer.stop();
-          ZeppelinServer.notebook.getInterpreterFactory().close();
-          ZeppelinServer.notebook.close();
+          jettyWebServer.stop();
+          notebook.getInterpreterFactory().close();
+          notebook.close();
         } catch (Exception e) {
           LOG.error("Error while stopping servlet container", e);
         }
@@ -125,18 +136,15 @@ public class ZeppelinServer extends Application {
       System.exit(0);
     }
 
-    jettyServer.join();
+    jettyWebServer.join();
     ZeppelinServer.notebook.getInterpreterFactory().close();
   }
 
-  private static Server setupJettyServer(ZeppelinConfiguration conf)
-      throws Exception {
-
+  private static Server setupJettyServer(ZeppelinConfiguration conf) {
     AbstractConnector connector;
     if (conf.useSsl()) {
       connector = new SslSelectChannelConnector(getSslContextFactory(conf));
-    }
-    else {
+    } else {
       connector = new SelectChannelConnector();
     }
 
@@ -153,11 +161,9 @@ public class ZeppelinServer extends Application {
     return server;
   }
 
-  private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf)
-      throws Exception {
-
-    notebookServer = new NotebookServer();
-    final ServletHolder servletHolder = new ServletHolder(notebookServer);
+  private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) {
+    notebookWsServer = new NotebookServer();
+    final ServletHolder servletHolder = new ServletHolder(notebookWsServer);
     servletHolder.setInitParameter("maxTextMessageSize", "1024000");
 
     final ServletContextHandler cxfContext = new ServletContextHandler(
@@ -171,9 +177,8 @@ public class ZeppelinServer extends Application {
     return cxfContext;
   }
 
-  private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
-      throws Exception {
-
+  @SuppressWarnings("deprecation")
+  private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
     // Note that the API for the SslContextFactory is different for
     // Jetty version 9
     SslContextFactory sslContextFactory = new SslContextFactory();
@@ -194,6 +199,7 @@ public class ZeppelinServer extends Application {
     return sslContextFactory;
   }
 
+  @SuppressWarnings("unused") //TODO(bzz) why unused?
   private static SSLContext getSslContext(ZeppelinConfiguration conf)
       throws Exception {
 
@@ -240,23 +246,10 @@ public class ZeppelinServer extends Application {
       webApp.setTempDirectory(warTempDirectory);
     }
     // Explicit bind to root
-    webApp.addServlet(
-      new ServletHolder(new DefaultServlet()),
-      "/*"
-    );
+    webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*");
     return webApp;
   }
 
-  public ZeppelinServer() throws Exception {
-    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-
-    this.schedulerFactory = new SchedulerFactory();
-
-    this.replFactory = new InterpreterFactory(conf, notebookServer);
-    this.notebookRepo = new NotebookRepoSync(conf);
-    notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer);
-  }
-
   @Override
   public Set<Class<?>> getClasses() {
     Set<Class<?>> classes = new HashSet<Class<?>>();
@@ -264,14 +257,14 @@ public class ZeppelinServer extends Application {
   }
 
   @Override
-  public java.util.Set<java.lang.Object> getSingletons() {
-    Set<Object> singletons = new HashSet<Object>();
+  public Set<Object> getSingletons() {
+    Set<Object> singletons = new HashSet<>();
 
     /** Rest-api root endpoint */
     ZeppelinRestApi root = new ZeppelinRestApi();
     singletons.add(root);
 
-    NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer);
+    NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer);
     singletons.add(notebookApi);
 
     InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 114582f..a010e58 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -19,6 +19,8 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import javax.servlet.http.HttpServletRequest;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -44,12 +46,12 @@ import org.eclipse.jetty.websocket.WebSocketServlet;
 import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Strings;
 import com.google.gson.Gson;
 /**
  * Zeppelin websocket service.
  *
- * @author anthonycorbacho
  */
 public class NotebookServer extends WebSocketServlet implements
         NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener {
@@ -57,7 +59,7 @@ public class NotebookServer extends WebSocketServlet implements
           .getLogger(NotebookServer.class);
   Gson gson = new Gson();
   final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>();
-  final List<NotebookSocket> connectedSockets = new LinkedList<>();
+  final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
 
   private Notebook notebook() {
     return ZeppelinServer.notebook;
@@ -85,9 +87,7 @@ public class NotebookServer extends WebSocketServlet implements
   public void onOpen(NotebookSocket conn) {
     LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(),
         conn.getRequest().getRemotePort());
-    synchronized (connectedSockets) {
-      connectedSockets.add(conn);
-    }
+    connectedSockets.add(conn);
   }
 
   @Override
@@ -147,8 +147,7 @@ public class NotebookServer extends WebSocketServlet implements
             completion(conn, notebook, messagereceived);
             break;
           case PING:
-            pong();
-            break;
+            break; //do nothing
           case ANGULAR_OBJECT_UPDATED:
             angularObjectUpdated(conn, notebook, messagereceived);
             break;
@@ -166,9 +165,7 @@ public class NotebookServer extends WebSocketServlet implements
     LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest()
         .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason);
     removeConnectionFromAllNote(conn);
-    synchronized (connectedSockets) {
-      connectedSockets.remove(conn);
-    }
+    connectedSockets.remove(conn);
   }
 
   protected Message deserializeMessage(String msg) {
@@ -285,13 +282,11 @@ public class NotebookServer extends WebSocketServlet implements
   }
 
   private void broadcastAll(Message m) {
-    synchronized (connectedSockets) {
-      for (NotebookSocket conn : connectedSockets) {
-        try {
-          conn.send(serializeMessage(m));
-        } catch (IOException e) {
-          LOG.error("socket error", e);
-        }
+    for (NotebookSocket conn : connectedSockets) {
+      try {
+        conn.send(serializeMessage(m));
+      } catch (IOException e) {
+        LOG.error("socket error", e);
       }
     }
   }
@@ -730,6 +725,7 @@ public class NotebookServer extends WebSocketServlet implements
   public static class ParagraphJobListener implements JobListener {
     private NotebookServer notebookServer;
     private Note note;
+
     public ParagraphJobListener(NotebookServer notebookServer, Note note) {
       this.notebookServer = notebookServer;
       this.note = note;
@@ -771,8 +767,6 @@ public class NotebookServer extends WebSocketServlet implements
   public JobListener getParagraphJobListener(Note note) {
     return new ParagraphJobListener(this, note);
   }
-  private void pong() {
-  }
 
   private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException {
     List<InterpreterSetting> settings = note.getNoteReplLoader()

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index db7affe..69d1022 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -29,8 +29,12 @@ import java.util.concurrent.Executors;
 
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpMethodBase;
-import org.apache.commons.httpclient.methods.*;
-import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
@@ -207,7 +211,7 @@ public abstract class AbstractTestRestApi {
       }
 
       LOG.info("Terminating test Zeppelin...");
-      ZeppelinServer.jettyServer.stop();
+      ZeppelinServer.jettyWebServer.stop();
       executor.shutdown();
 
       long s = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index faef287..67d12b7 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -60,7 +60,7 @@ public class NotebookServerTest extends AbstractTestRestApi {
     AbstractTestRestApi.startUp();
     gson = new Gson();
     notebook = ZeppelinServer.notebook;
-    notebookServer = ZeppelinServer.notebookServer;
+    notebookServer = ZeppelinServer.notebookWsServer;
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 909345a..72b6a3c 100755
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -18,12 +18,12 @@
 package org.apache.zeppelin.conf;
 
 import java.net.URL;
-import java.util.*;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.configuration.tree.ConfigurationNode;
-import org.apache.zeppelin.notebook.repo.S3NotebookRepo;
 import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
 /**
  * Zeppelin configuration.
  *
- * @author Leemoonsoo
- *
  */
 public class ZeppelinConfiguration extends XMLConfiguration {
   private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";