You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/04/06 06:05:56 UTC
[05/17] incubator-zeppelin git commit: Rename package/groupId to
org.apache and apply rat plugin.
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java
deleted file mode 100644
index fd9960b..0000000
--- a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/NotebookServer.java
+++ /dev/null
@@ -1,483 +0,0 @@
-package com.nflabs.zeppelin.socket;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.nflabs.zeppelin.interpreter.InterpreterResult;
-import org.java_websocket.WebSocket;
-import org.java_websocket.handshake.ClientHandshake;
-import org.java_websocket.server.WebSocketServer;
-import org.quartz.SchedulerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Strings;
-import com.google.gson.Gson;
-import com.nflabs.zeppelin.notebook.JobListenerFactory;
-import com.nflabs.zeppelin.notebook.Note;
-import com.nflabs.zeppelin.notebook.Notebook;
-import com.nflabs.zeppelin.notebook.Paragraph;
-import com.nflabs.zeppelin.scheduler.Job;
-import com.nflabs.zeppelin.scheduler.Job.Status;
-import com.nflabs.zeppelin.scheduler.JobListener;
-import com.nflabs.zeppelin.server.ZeppelinServer;
-import com.nflabs.zeppelin.socket.Message.OP;
-
-/**
- * Zeppelin websocket service.
- *
- * @author anthonycorbacho
- */
-public class NotebookServer extends WebSocketServer implements JobListenerFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
- private static final int DEFAULT_PORT = 8282;
-
- private static void creatingwebSocketServerLog(int port) {
- LOG.info("Create zeppelin websocket on port {}", port);
- }
-
- Gson gson = new Gson();
- Map<String, List<WebSocket>> noteSocketMap = new HashMap<String, List<WebSocket>>();
- List<WebSocket> connectedSockets = new LinkedList<WebSocket>();
-
- public NotebookServer() {
- super(new InetSocketAddress(DEFAULT_PORT));
- creatingwebSocketServerLog(DEFAULT_PORT);
- }
-
- public NotebookServer(int port) {
- super(new InetSocketAddress(port));
- creatingwebSocketServerLog(port);
- }
-
- private Notebook notebook() {
- return ZeppelinServer.notebook;
- }
-
- @Override
- public void onOpen(WebSocket conn, ClientHandshake handshake) {
- LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn
- .getRemoteSocketAddress().getPort());
- synchronized (connectedSockets) {
- connectedSockets.add(conn);
- }
- }
-
- @Override
- public void onMessage(WebSocket conn, String msg) {
- Notebook notebook = notebook();
- try {
- Message messagereceived = deserializeMessage(msg);
- LOG.info("RECEIVE << " + messagereceived.op);
- /** Lets be elegant here */
- switch (messagereceived.op) {
- case LIST_NOTES:
- broadcastNoteList();
- break;
- case GET_NOTE:
- sendNote(conn, notebook, messagereceived);
- break;
- case NEW_NOTE:
- createNote(conn, notebook);
- break;
- case DEL_NOTE:
- removeNote(conn, notebook, messagereceived);
- break;
- case COMMIT_PARAGRAPH:
- updateParagraph(conn, notebook, messagereceived);
- break;
- case RUN_PARAGRAPH:
- runParagraph(conn, notebook, messagereceived);
- break;
- case CANCEL_PARAGRAPH:
- cancelParagraph(conn, notebook, messagereceived);
- break;
- case MOVE_PARAGRAPH:
- moveParagraph(conn, notebook, messagereceived);
- break;
- case INSERT_PARAGRAPH:
- insertParagraph(conn, notebook, messagereceived);
- break;
- case PARAGRAPH_REMOVE:
- removeParagraph(conn, notebook, messagereceived);
- break;
- case NOTE_UPDATE:
- updateNote(conn, notebook, messagereceived);
- break;
- case COMPLETION:
- completion(conn, notebook, messagereceived);
- break;
- default:
- broadcastNoteList();
- break;
- }
- } catch (Exception e) {
- LOG.error("Can't handle message", e);
- }
- }
-
- @Override
- public void onClose(WebSocket conn, int code, String reason, boolean remote) {
- LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn
- .getRemoteSocketAddress().getPort());
- removeConnectionFromAllNote(conn);
- synchronized (connectedSockets) {
- connectedSockets.remove(conn);
- }
- }
-
- @Override
- public void onError(WebSocket conn, Exception message) {
- removeConnectionFromAllNote(conn);
- synchronized (connectedSockets) {
- connectedSockets.remove(conn);
- }
- }
-
- private Message deserializeMessage(String msg) {
- Message m = gson.fromJson(msg, Message.class);
- return m;
- }
-
- private String serializeMessage(Message m) {
- return gson.toJson(m);
- }
-
- private void addConnectionToNote(String noteId, WebSocket socket) {
- synchronized (noteSocketMap) {
- removeConnectionFromAllNote(socket); // make sure a socket relates only a single note.
- List<WebSocket> socketList = noteSocketMap.get(noteId);
- if (socketList == null) {
- socketList = new LinkedList<WebSocket>();
- noteSocketMap.put(noteId, socketList);
- }
-
- if (socketList.contains(socket) == false) {
- socketList.add(socket);
- }
- }
- }
-
- private void removeConnectionFromNote(String noteId, WebSocket socket) {
- synchronized (noteSocketMap) {
- List<WebSocket> socketList = noteSocketMap.get(noteId);
- if (socketList != null) {
- socketList.remove(socket);
- }
- }
- }
-
- private void removeNote(String noteId) {
- synchronized (noteSocketMap) {
- List<WebSocket> socketList = noteSocketMap.remove(noteId);
- }
- }
-
- private void removeConnectionFromAllNote(WebSocket socket) {
- synchronized (noteSocketMap) {
- Set<String> keys = noteSocketMap.keySet();
- for (String noteId : keys) {
- removeConnectionFromNote(noteId, socket);
- }
- }
- }
-
- private String getOpenNoteId(WebSocket socket) {
- String id = null;
- synchronized (noteSocketMap) {
- Set<String> keys = noteSocketMap.keySet();
- for (String noteId : keys) {
- List<WebSocket> sockets = noteSocketMap.get(noteId);
- if (sockets.contains(socket)) {
- id = noteId;
- }
- }
- }
- return id;
- }
-
- private void broadcast(String noteId, Message m) {
- LOG.info("SEND >> " + m.op);
- synchronized (noteSocketMap) {
- List<WebSocket> socketLists = noteSocketMap.get(noteId);
- if (socketLists == null || socketLists.size() == 0) {
- return;
- }
- for (WebSocket conn : socketLists) {
- conn.send(serializeMessage(m));
- }
- }
- }
-
- private void broadcastAll(Message m) {
- synchronized (connectedSockets) {
- for (WebSocket conn : connectedSockets) {
- conn.send(serializeMessage(m));
- }
- }
- }
-
- private void broadcastNote(Note note) {
- broadcast(note.id(), new Message(OP.NOTE).put("note", note));
- }
-
- private void broadcastNoteList() {
- Notebook notebook = notebook();
- List<Note> notes = notebook.getAllNotes();
- List<Map<String, String>> notesInfo = new LinkedList<Map<String, String>>();
- for (Note note : notes) {
- Map<String, String> info = new HashMap<String, String>();
- info.put("id", note.id());
- info.put("name", note.getName());
- notesInfo.add(info);
- }
- broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
- }
-
- private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) {
- String noteId = (String) fromMessage.get("id");
- if (noteId == null) {
- return;
- }
- Note note = notebook.getNote(noteId);
- if (note != null) {
- addConnectionToNote(note.id(), conn);
- conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
- }
- }
-
- private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage)
- throws SchedulerException, IOException {
- String noteId = (String) fromMessage.get("id");
- String name = (String) fromMessage.get("name");
- Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
- if (noteId == null) {
- return;
- }
- if (config == null) {
- return;
- }
- Note note = notebook.getNote(noteId);
- if (note != null) {
- boolean cronUpdated = isCronUpdated(config, note.getConfig());
- note.setName(name);
- note.setConfig(config);
-
- if (cronUpdated) {
- notebook.refreshCron(note.id());
- }
- note.persist();
-
- broadcastNote(note);
- broadcastNoteList();
- }
- }
-
- private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) {
- boolean cronUpdated = false;
- if (configA.get("cron") != null && configB.get("cron") != null
- && configA.get("cron").equals(configB.get("cron"))) {
- cronUpdated = true;
- } else if (configA.get("cron") == null && configB.get("cron") == null) {
- cronUpdated = false;
- } else if (configA.get("cron") != null || configB.get("cron") != null) {
- cronUpdated = true;
- }
- return cronUpdated;
- }
-
- private void createNote(WebSocket conn, Notebook notebook) throws IOException {
- Note note = notebook.createNote();
- note.addParagraph(); // it's an empty note. so add one paragraph
- note.persist();
- broadcastNote(note);
- broadcastNoteList();
- }
-
- private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- String noteId = (String) fromMessage.get("id");
- if (noteId == null) {
- return;
- }
- Note note = notebook.getNote(noteId);
- note.unpersist();
- notebook.removeNote(noteId);
- removeNote(noteId);
- broadcastNoteList();
- }
-
- private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- String paragraphId = (String) fromMessage.get("id");
- if (paragraphId == null) {
- return;
- }
- Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
- Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
- final Note note = notebook.getNote(getOpenNoteId(conn));
- Paragraph p = note.getParagraph(paragraphId);
- p.settings.setParams(params);
- p.setConfig(config);
- p.setTitle((String) fromMessage.get("title"));
- p.setText((String) fromMessage.get("paragraph"));
- note.persist();
- broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p));
- }
-
- private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- final String paragraphId = (String) fromMessage.get("id");
- if (paragraphId == null) {
- return;
- }
- final Note note = notebook.getNote(getOpenNoteId(conn));
- /** We dont want to remove the last paragraph */
- if (!note.isLastParagraph(paragraphId)) {
- note.removeParagraph(paragraphId);
- note.persist();
- broadcastNote(note);
- }
- }
-
- private void completion(WebSocket conn, Notebook notebook, Message fromMessage) {
- String paragraphId = (String) fromMessage.get("id");
- String buffer = (String) fromMessage.get("buf");
- int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
- Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
-
- if (paragraphId == null) {
- conn.send(serializeMessage(resp));
- return;
- }
-
- final Note note = notebook.getNote(getOpenNoteId(conn));
- List<String> candidates = note.completion(paragraphId, buffer, cursor);
- resp.put("completions", candidates);
- conn.send(serializeMessage(resp));
- }
-
- private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- final String paragraphId = (String) fromMessage.get("id");
- if (paragraphId == null) {
- return;
- }
-
- final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
- final Note note = notebook.getNote(getOpenNoteId(conn));
- note.moveParagraph(paragraphId, newIndex);
- note.persist();
- broadcastNote(note);
- }
-
- private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
-
- final Note note = notebook.getNote(getOpenNoteId(conn));
- note.insertParagraph(index);
- note.persist();
- broadcastNote(note);
- }
-
-
- private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- final String paragraphId = (String) fromMessage.get("id");
- if (paragraphId == null) {
- return;
- }
-
- final Note note = notebook.getNote(getOpenNoteId(conn));
- Paragraph p = note.getParagraph(paragraphId);
- p.abort();
- }
-
- private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
- throws IOException {
- final String paragraphId = (String) fromMessage.get("id");
- if (paragraphId == null) {
- return;
- }
- final Note note = notebook.getNote(getOpenNoteId(conn));
- Paragraph p = note.getParagraph(paragraphId);
- String text = (String) fromMessage.get("paragraph");
- p.setText(text);
- p.setTitle((String) fromMessage.get("title"));
- Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
- p.settings.setParams(params);
- Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
- p.setConfig(config);
-
- // if it's the last paragraph, let's add a new one
- boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId());
- if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) {
- note.addParagraph();
- }
- note.persist();
- broadcastNote(note);
-
- try {
- note.run(paragraphId);
- }
- catch (Exception ex) {
- LOG.error("Exception from run", ex);
- if (p != null) {
- p.setReturn(new InterpreterResult(
- InterpreterResult.Code.ERROR, ex.getMessage()), ex);
- p.setStatus(Status.ERROR);
- }
- }
- }
-
- /**
- * Need description here.
- *
- */
- public static class ParagraphJobListener implements JobListener {
- private NotebookServer notebookServer;
- private Note note;
-
- public ParagraphJobListener(NotebookServer notebookServer, Note note) {
- this.notebookServer = notebookServer;
- this.note = note;
- }
-
- @Override
- public void onProgressUpdate(Job job, int progress) {
- notebookServer.broadcast(note.id(),
- new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress()));
- }
-
- @Override
- public void beforeStatusChange(Job job, Status before, Status after) {}
-
- @Override
- public void afterStatusChange(Job job, Status before, Status after) {
- if (after == Status.ERROR) {
- job.getException().printStackTrace();
- }
- if (job.isTerminated()) {
- LOG.info("Job {} is finished", job.getId());
- try {
- note.persist();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- notebookServer.broadcastNote(note);
- }
- }
-
- @Override
- public JobListener getParagraphJobListener(Note note) {
- return new ParagraphJobListener(this, note);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java
deleted file mode 100644
index b84e1d0..0000000
--- a/zeppelin-server/src/main/java/com/nflabs/zeppelin/socket/SslWebSocketServerFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.nflabs.zeppelin.socket;
-
-import org.java_websocket.server.DefaultSSLWebSocketServerFactory;
-import org.java_websocket.SSLSocketChannel2;
-
-import java.io.IOException;
-
-import java.nio.channels.ByteChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
-import java.util.concurrent.ExecutorService;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-/**
- * Extension of the java_websocket library's DefaultSslWebSocketServerFactory
- * to require client side authentication during the SSL handshake
- */
-public class SslWebSocketServerFactory
- extends DefaultSSLWebSocketServerFactory {
-
- protected boolean needClientAuth;
-
- public SslWebSocketServerFactory(SSLContext sslcontext) {
- super(sslcontext);
- initAttributes();
- }
-
- public SslWebSocketServerFactory(
- SSLContext sslcontext,
- ExecutorService exec) {
-
- super(sslcontext, exec);
- initAttributes();
- }
-
- protected void initAttributes() {
- this.needClientAuth = false;
- }
-
- @Override
- public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key)
- throws IOException {
-
- SSLEngine sslEngine = sslcontext.createSSLEngine();
- sslEngine.setUseClientMode(false);
- sslEngine.setNeedClientAuth(needClientAuth);
- return new SSLSocketChannel2( channel, sslEngine, exec, key );
- }
-
- public boolean getNeedClientAuth() {
- return needClientAuth;
- }
-
- public void setNeedClientAuth(boolean needClientAuth) {
- this.needClientAuth = needClientAuth;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
new file mode 100644
index 0000000..1e2ade6
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest;
+import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest;
+import org.apache.zeppelin.server.JsonResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
+import com.wordnik.swagger.annotations.ApiResponse;
+import com.wordnik.swagger.annotations.ApiResponses;
+
+/**
+ * Interpreter Rest API
+ *
+ */
+@Path("/interpreter")
+@Produces("application/json")
+@Api(value = "/interpreter", description = "Zeppelin Interpreter REST API")
+public class InterpreterRestApi {
+ Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class);
+
+ private InterpreterFactory interpreterFactory;
+
+ Gson gson = new Gson();
+
+ public InterpreterRestApi() {
+
+ }
+
+ public InterpreterRestApi(InterpreterFactory interpreterFactory) {
+ this.interpreterFactory = interpreterFactory;
+ }
+
+ /**
+ * List all interpreter settings
+ * @return
+ */
+ @GET
+ @Path("setting")
+ @ApiOperation(httpMethod = "GET", value = "List all interpreter setting")
+ @ApiResponses(value = {@ApiResponse(code = 500, message = "When something goes wrong")})
+ public Response listSettings() {
+ List<InterpreterSetting> interpreterSettings = null;
+ interpreterSettings = interpreterFactory.get();
+ return new JsonResponse(Status.OK, "", interpreterSettings).build();
+ }
+
+ /**
+ * Add new interpreter setting
+ * @param message
+ * @return
+ * @throws IOException
+ * @throws InterpreterException
+ */
+ @POST
+ @Path("setting")
+ @ApiOperation(httpMethod = "GET", value = "Create new interpreter setting")
+ @ApiResponses(value = {@ApiResponse(code = 201, message = "On success")})
+ public Response newSettings(String message) throws InterpreterException, IOException {
+ NewInterpreterSettingRequest request = gson.fromJson(message,
+ NewInterpreterSettingRequest.class);
+ Properties p = new Properties();
+ p.putAll(request.getProperties());
+ interpreterFactory.add(request.getName(), request.getGroup(), request.getOption(), p);
+ return new JsonResponse(Status.CREATED, "").build();
+ }
+
+ @PUT
+ @Path("setting/{settingId}")
+ public Response updateSetting(String message, @PathParam("settingId") String settingId) {
+ logger.info("Update interpreterSetting {}", settingId);
+
+ try {
+ UpdateInterpreterSettingRequest p = gson.fromJson(message,
+ UpdateInterpreterSettingRequest.class);
+ interpreterFactory.setPropertyAndRestart(settingId, p.getOption(), p.getProperties());
+ } catch (InterpreterException e) {
+ return new JsonResponse(Status.NOT_FOUND, e.getMessage(), e).build();
+ } catch (IOException e) {
+ return new JsonResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage(), e).build();
+ }
+ InterpreterSetting setting = interpreterFactory.get(settingId);
+ if (setting == null) {
+ return new JsonResponse(Status.NOT_FOUND, "", settingId).build();
+ }
+ return new JsonResponse(Status.OK, "", setting).build();
+ }
+
+ @DELETE
+ @Path("setting/{settingId}")
+ @ApiOperation(httpMethod = "GET", value = "Remove interpreter setting")
+ @ApiResponses(value = {@ApiResponse(code = 500, message = "When something goes wrong")})
+ public Response removeSetting(@PathParam("settingId") String settingId) throws IOException {
+ logger.info("Remove interpreterSetting {}", settingId);
+ interpreterFactory.remove(settingId);
+ return new JsonResponse(Status.OK).build();
+ }
+
+ @PUT
+ @Path("setting/restart/{settingId}")
+ @ApiOperation(httpMethod = "GET", value = "restart interpreter setting")
+ @ApiResponses(value = {
+ @ApiResponse(code = 404, message = "Not found")})
+ public Response restartSetting(@PathParam("settingId") String settingId) {
+ logger.info("Restart interpreterSetting {}", settingId);
+ try {
+ interpreterFactory.restart(settingId);
+ } catch (InterpreterException e) {
+ return new JsonResponse(Status.NOT_FOUND, e.getMessage(), e).build();
+ }
+ InterpreterSetting setting = interpreterFactory.get(settingId);
+ if (setting == null) {
+ return new JsonResponse(Status.NOT_FOUND, "", settingId).build();
+ }
+ return new JsonResponse(Status.OK, "", setting).build();
+ }
+
+ /**
+ * List all available interpreters by group
+ */
+ @GET
+ @ApiOperation(httpMethod = "GET", value = "List all available interpreters")
+ @ApiResponses(value = {
+ @ApiResponse(code = 500, message = "When something goes wrong")})
+ public Response listInterpreter(String message) {
+ Map<String, RegisteredInterpreter> m = Interpreter.registeredInterpreters;
+ return new JsonResponse(Status.OK, "", m).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java
new file mode 100644
index 0000000..1397ac1
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookResponse.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * Response wrapper.
+ *
+ * @author anthonycorbacho
+ *
+ */
+@XmlRootElement
+public class NotebookResponse {
+ private String msg;
+
+ public NotebookResponse() {}
+
+ public NotebookResponse(String msg) {
+ this.msg = msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
new file mode 100644
index 0000000..8a933f7
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind;
+import org.apache.zeppelin.server.JsonResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * Rest api endpoint for the noteBook.
+ */
+@Path("/notebook")
+@Produces("application/json")
+public class NotebookRestApi {
+ Logger logger = LoggerFactory.getLogger(NotebookRestApi.class);
+ Gson gson = new Gson();
+ private Notebook notebook;
+
+ public NotebookRestApi() {}
+
+ public NotebookRestApi(Notebook notebook) {
+ this.notebook = notebook;
+ }
+
+ /**
+ * bind a setting to note
+ * @throws IOException
+ */
+ @PUT
+ @Path("interpreter/bind/{noteId}")
+ public Response bind(@PathParam("noteId") String noteId, String req) throws IOException {
+ List<String> settingIdList = gson.fromJson(req, new TypeToken<List<String>>(){}.getType());
+ notebook.bindInterpretersToNote(noteId, settingIdList);
+ return new JsonResponse(Status.OK).build();
+ }
+
+ /**
+ * list binded setting
+ */
+ @GET
+ @Path("interpreter/bind/{noteId}")
+ public Response bind(@PathParam("noteId") String noteId) {
+ List<InterpreterSettingListForNoteBind> settingList
+ = new LinkedList<InterpreterSettingListForNoteBind>();
+
+ List<InterpreterSetting> selectedSettings = notebook.getBindedInterpreterSettings(noteId);
+ for (InterpreterSetting setting : selectedSettings) {
+ settingList.add(new InterpreterSettingListForNoteBind(
+ setting.id(),
+ setting.getName(),
+ setting.getGroup(),
+ setting.getInterpreterGroup(),
+ true)
+ );
+ }
+
+ List<InterpreterSetting> availableSettings = notebook.getInterpreterFactory().get();
+ for (InterpreterSetting setting : availableSettings) {
+ boolean selected = false;
+ for (InterpreterSetting selectedSetting : selectedSettings) {
+ if (selectedSetting.id().equals(setting.id())) {
+ selected = true;
+ break;
+ }
+ }
+
+ if (!selected) {
+ settingList.add(new InterpreterSettingListForNoteBind(
+ setting.id(),
+ setting.getName(),
+ setting.getGroup(),
+ setting.getInterpreterGroup(),
+ false)
+ );
+ }
+ }
+ return new JsonResponse(Status.OK, "", settingList).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java
new file mode 100644
index 0000000..4fc47a4
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ZeppelinRestApi.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+
+import com.wordnik.swagger.annotations.Api;
+
+/**
+ * Zeppelin root rest api endpoint.
+ *
+ * @author anthonycorbacho
+ * @since 0.3.4
+ */
+@Path("/")
+@Api(value = "/", description = "Zeppelin REST API root")
+public class ZeppelinRestApi {
+
+ /**
+ * Required by Swagger.
+ */
+ public ZeppelinRestApi() {
+ super();
+ }
+
+ /**
+ * Get the root endpoint Return always 200.
+ *
+ * @return 200 response
+ */
+ @GET
+ public Response getRoot() {
+ return Response.ok().build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
new file mode 100644
index 0000000..b74054c
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/InterpreterSettingListForNoteBind.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.message;
+
+import java.util.List;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+
+/**
+ * InterpreterSetting information for binding
+ */
+public class InterpreterSettingListForNoteBind {
+ String id;
+ String name;
+ String group;
+ private boolean selected;
+ private List<Interpreter> interpreters;
+
+ public InterpreterSettingListForNoteBind(String id, String name,
+ String group, List<Interpreter> interpreters, boolean selected) {
+ super();
+ this.id = id;
+ this.name = name;
+ this.group = group;
+ this.interpreters = interpreters;
+ this.selected = selected;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public List<Interpreter> getInterpreterNames() {
+ return interpreters;
+ }
+
+ public void setInterpreterNames(List<Interpreter> interpreters) {
+ this.interpreters = interpreters;
+ }
+
+ public boolean isSelected() {
+ return selected;
+ }
+
+ public void setSelected(boolean selected) {
+ this.selected = selected;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
new file mode 100644
index 0000000..6489a71
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/NewInterpreterSettingRequest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.message;
+
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.InterpreterOption;
+
+/**
+ * NewInterpreterSetting rest api request message
+ *
+ */
+public class NewInterpreterSettingRequest {
+ String name;
+ String group;
+ InterpreterOption option;
+ Map<String, String> properties;
+
+ public NewInterpreterSettingRequest() {
+
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public InterpreterOption getOption() {
+ return option;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
new file mode 100644
index 0000000..98f4ab7
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/message/UpdateInterpreterSettingRequest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.rest.message;
+
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.InterpreterOption;
+
+/**
+ *
+ */
+public class UpdateInterpreterSettingRequest {
+ InterpreterOption option;
+ Properties properties;
+
+ public UpdateInterpreterSettingRequest(InterpreterOption option,
+ Properties properties) {
+ super();
+ this.option = option;
+ this.properties = properties;
+ }
+ public InterpreterOption getOption() {
+ return option;
+ }
+ public Properties getProperties() {
+ return properties;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java
new file mode 100644
index 0000000..8c8f9a7
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.util.resource.Resource;
+
+/**
+ * Simple servlet to dynamically set the Websocket port
+ * in the JavaScript sent to the client
+ */
+public class AppScriptServlet extends DefaultServlet {
+
+ // Hash containing the possible scripts that contain the getPort()
+ // function originally defined in app.js
+ private static Set<String> scriptPaths = new HashSet<String>(
+ Arrays.asList(
+ "/scripts/scripts.js",
+ "/scripts/app.js"
+ )
+ );
+
+ private int websocketPort;
+
+ public AppScriptServlet(int websocketPort) {
+ this.websocketPort = websocketPort;
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException,
+ IOException {
+
+ // Process all requests not for the app script to the parent
+ // class
+ String uri = request.getRequestURI();
+ if (!scriptPaths.contains(uri)) {
+ super.doGet(request, response);
+ return;
+ }
+
+ // Read the script file chunk by chunk
+ Resource scriptFile = getResource(uri);
+ InputStream is = scriptFile.getInputStream();
+ StringBuffer script = new StringBuffer();
+ byte[] buffer = new byte[1024];
+ while (is.available() > 0) {
+ int numRead = is.read(buffer);
+ if (numRead <= 0) {
+ break;
+ }
+ script.append(new String(buffer, 0, numRead, "UTF-8"));
+ }
+
+ // Replace the string "function getPort(){...}" to return
+ // the proper value
+ int startIndex = script.indexOf("function getPort()");
+ int endIndex = script.indexOf("}", startIndex);
+
+ if (startIndex >= 0 && endIndex >= 0) {
+ String replaceString = "function getPort(){return " + websocketPort + "}";
+ script.replace(startIndex, endIndex + 1, replaceString);
+ }
+
+ response.getWriter().println(script.toString());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java
new file mode 100644
index 0000000..1524d5b
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/CorsFilter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.server;
+
+import java.io.IOException;
+import java.text.DateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Cors filter
+ *
+ */
+public class CorsFilter implements Filter {
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain)
+ throws IOException, ServletException {
+ if (((HttpServletRequest) request).getMethod().equals("OPTIONS")) {
+ HttpServletResponse resp = ((HttpServletResponse) response);
+ addCorsHeaders(resp);
+ return;
+ }
+
+ if (response instanceof HttpServletResponse) {
+ HttpServletResponse alteredResponse = ((HttpServletResponse) response);
+ addCorsHeaders(alteredResponse);
+ }
+ filterChain.doFilter(request, response);
+ }
+
+ private void addCorsHeaders(HttpServletResponse response) {
+ response.addHeader("Access-Control-Allow-Origin", "*");
+ response.addHeader("Access-Control-Allow-Credentials", "true");
+ response.addHeader("Access-Control-Allow-Headers", "authorization,Content-Type");
+ response.addHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, HEAD, DELETE");
+ DateFormat fullDateFormatEN =
+ DateFormat.getDateTimeInstance(DateFormat.FULL, DateFormat.FULL, new Locale("EN", "en"));
+ response.addHeader("Date", fullDateFormatEN.format(new Date()));
+ }
+
+ @Override
+ public void destroy() {}
+
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java
new file mode 100644
index 0000000..28a3bb8
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.server;
+
+import java.util.ArrayList;
+
+import javax.ws.rs.core.NewCookie;
+import javax.ws.rs.core.Response.ResponseBuilder;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterSerializer;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Json response builder.
+ *
+ * @author Leemoonsoo
+ *
+ * @param <T>
+ */
+public class JsonResponse<T> {
+ private javax.ws.rs.core.Response.Status status;
+ private String message;
+ private T body;
+ transient ArrayList<NewCookie> cookies;
+ transient boolean pretty = false;
+
+ public JsonResponse(javax.ws.rs.core.Response.Status status) {
+ this.status = status;
+ this.message = null;
+ this.body = null;
+
+ }
+
+ public JsonResponse(javax.ws.rs.core.Response.Status status, String message) {
+ this.status = status;
+ this.message = message;
+ this.body = null;
+ }
+
+ public JsonResponse(javax.ws.rs.core.Response.Status status, T body) {
+ this.status = status;
+ this.message = null;
+ this.body = body;
+ }
+
+ public JsonResponse(javax.ws.rs.core.Response.Status status, String message, T body) {
+ this.status = status;
+ this.message = message;
+ this.body = body;
+ }
+
+ public JsonResponse<T> setPretty(boolean pretty) {
+ this.pretty = pretty;
+ return this;
+ }
+
+ /**
+ * Add cookie for building.
+ *
+ * @param newCookie
+ * @return
+ */
+ public JsonResponse<T> addCookie(NewCookie newCookie) {
+ if (cookies == null) {
+ cookies = new ArrayList<NewCookie>();
+ }
+ cookies.add(newCookie);
+
+ return this;
+ }
+
+ /**
+ * Add cookie for building.
+ *
+ * @param name
+ * @param value
+ * @return
+ */
+ public JsonResponse<?> addCookie(String name, String value) {
+ return addCookie(new NewCookie(name, value));
+ }
+
+ @Override
+ public String toString() {
+ GsonBuilder gsonBuilder = new GsonBuilder()
+ .registerTypeAdapter(Interpreter.class, new InterpreterSerializer());
+ if (pretty) {
+ gsonBuilder.setPrettyPrinting();
+ }
+ Gson gson = gsonBuilder.create();
+ return gson.toJson(this);
+ }
+
+ public javax.ws.rs.core.Response.Status getCode() {
+ return status;
+ }
+
+ public void setCode(javax.ws.rs.core.Response.Status status) {
+ this.status = status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public T getBody() {
+ return body;
+ }
+
+ public void setBody(T body) {
+ this.body = body;
+ }
+
+ public javax.ws.rs.core.Response build() {
+ ResponseBuilder r = javax.ws.rs.core.Response.status(status).entity(this.toString());
+ if (cookies != null) {
+ for (NewCookie nc : cookies) {
+ r.cookie(nc);
+ }
+ }
+ return r.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
new file mode 100644
index 0000000..1c9aa03
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.Application;
+
+import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.rest.InterpreterRestApi;
+import org.apache.zeppelin.rest.NotebookRestApi;
+import org.apache.zeppelin.rest.ZeppelinRestApi;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.apache.zeppelin.socket.NotebookServer;
+import org.apache.zeppelin.socket.SslWebSocketServerFactory;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.bio.SocketConnector;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.session.SessionHandler;
+import org.eclipse.jetty.server.ssl.SslSocketConnector;
+import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.webapp.WebAppContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.wordnik.swagger.jersey.config.JerseyJaxrsConfig;
+
+/**
+ * Main class of Zeppelin.
+ *
+ * @author Leemoonsoo
+ *
+ */
+
+public class ZeppelinServer extends Application {
+ private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class);
+
+ private SchedulerFactory schedulerFactory;
+ public static Notebook notebook;
+
+ static NotebookServer notebookServer;
+
+ private InterpreterFactory replFactory;
+
+ public static void main(String[] args) throws Exception {
+ ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+ conf.setProperty("args", args);
+
+ final Server jettyServer = setupJettyServer(conf);
+ notebookServer = setupNotebookServer(conf);
+
+ // REST api
+ final ServletContextHandler restApi = setupRestApiContextHandler();
+ /** NOTE: Swagger-core is included via the web.xml in zeppelin-web
+ * But the rest of swagger is configured here
+ */
+ final ServletContextHandler swagger = setupSwaggerContextHandler(conf);
+
+ // Web UI
+ final WebAppContext webApp = setupWebAppContext(conf);
+ //Below is commented since zeppelin-docs module is removed.
+ //final WebAppContext webAppSwagg = setupWebAppSwagger(conf);
+
+ // add all handlers
+ ContextHandlerCollection contexts = new ContextHandlerCollection();
+ //contexts.setHandlers(new Handler[]{swagger, restApi, webApp, webAppSwagg});
+ contexts.setHandlers(new Handler[]{swagger, restApi, webApp});
+ jettyServer.setHandler(contexts);
+
+ notebookServer.start();
+ LOG.info("Start zeppelin server");
+ jettyServer.start();
+ LOG.info("Started");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(){
+ @Override public void run() {
+ LOG.info("Shutting down Zeppelin Server ... ");
+ try {
+ notebook.getInterpreterFactory().close();
+
+ jettyServer.stop();
+ notebookServer.stop();
+ } catch (Exception e) {
+ LOG.error("Error while stopping servlet container", e);
+ }
+ LOG.info("Bye");
+ }
+ });
+
+
+ // when zeppelin is started inside of ide (especially for eclipse)
+ // for graceful shutdown, input any key in console window
+ if (System.getenv("ZEPPELIN_IDENT_STRING") == null) {
+ try {
+ System.in.read();
+ } catch (IOException e) {
+ }
+ System.exit(0);
+ }
+
+ jettyServer.join();
+ }
+
+ private static Server setupJettyServer(ZeppelinConfiguration conf)
+ throws Exception {
+
+ SocketConnector connector;
+ if (conf.useSsl()) {
+ connector = new SslSocketConnector(getSslContextFactory(conf));
+ }
+ else {
+ connector = new SocketConnector();
+ }
+
+ // Set some timeout options to make debugging easier.
+ int timeout = 1000 * 30;
+ connector.setMaxIdleTime(timeout);
+ connector.setSoLingerTime(-1);
+ connector.setPort(conf.getServerPort());
+
+ final Server server = new Server();
+ server.addConnector(connector);
+
+ return server;
+ }
+
+ private static NotebookServer setupNotebookServer(ZeppelinConfiguration conf)
+ throws Exception {
+
+ NotebookServer server = new NotebookServer(conf.getWebSocketPort());
+
+ // Default WebSocketServer uses unencrypted connector, so only need to
+ // change the connector if SSL should be used.
+ if (conf.useSsl()) {
+ SslWebSocketServerFactory wsf = new SslWebSocketServerFactory(getSslContext(conf));
+ wsf.setNeedClientAuth(conf.useClientAuth());
+ server.setWebSocketFactory(wsf);
+ }
+
+ return server;
+ }
+
+ private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf)
+ throws Exception {
+
+ // Note that the API for the SslContextFactory is different for
+ // Jetty version 9
+ SslContextFactory sslContextFactory = new SslContextFactory();
+
+ // Set keystore
+ sslContextFactory.setKeyStore(conf.getKeyStorePath());
+ sslContextFactory.setKeyStoreType(conf.getKeyStoreType());
+ sslContextFactory.setKeyStorePassword(conf.getKeyStorePassword());
+ sslContextFactory.setKeyManagerPassword(conf.getKeyManagerPassword());
+
+ // Set truststore
+ sslContextFactory.setTrustStore(conf.getTrustStorePath());
+ sslContextFactory.setTrustStoreType(conf.getTrustStoreType());
+ sslContextFactory.setTrustStorePassword(conf.getTrustStorePassword());
+
+ sslContextFactory.setNeedClientAuth(conf.useClientAuth());
+
+ return sslContextFactory;
+ }
+
+ private static SSLContext getSslContext(ZeppelinConfiguration conf)
+ throws Exception {
+
+ SslContextFactory scf = getSslContextFactory(conf);
+ if (!scf.isStarted()) {
+ scf.start();
+ }
+ return scf.getSslContext();
+ }
+
+ private static ServletContextHandler setupRestApiContextHandler() {
+ final ServletHolder cxfServletHolder = new ServletHolder(new CXFNonSpringJaxrsServlet());
+ cxfServletHolder.setInitParameter("javax.ws.rs.Application", ZeppelinServer.class.getName());
+ cxfServletHolder.setName("rest");
+ cxfServletHolder.setForcedPath("rest");
+
+ final ServletContextHandler cxfContext = new ServletContextHandler();
+ cxfContext.setSessionHandler(new SessionHandler());
+ cxfContext.setContextPath("/api");
+ cxfContext.addServlet(cxfServletHolder, "/*");
+
+ cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*",
+ EnumSet.allOf(DispatcherType.class));
+ return cxfContext;
+ }
+
+ /**
+ * Swagger core handler - Needed for the RestFul api documentation.
+ *
+ * @return ServletContextHandler of Swagger
+ */
+ private static ServletContextHandler setupSwaggerContextHandler(
+ ZeppelinConfiguration conf) {
+
+ // Configure Swagger-core
+ final ServletHolder swaggerServlet =
+ new ServletHolder(new JerseyJaxrsConfig());
+ swaggerServlet.setName("JerseyJaxrsConfig");
+ swaggerServlet.setInitParameter("api.version", "1.0.0");
+ swaggerServlet.setInitParameter(
+ "swagger.api.basepath",
+ "http://localhost:" + conf.getServerPort() + "/api");
+ swaggerServlet.setInitOrder(2);
+
+ // Setup the handler
+ final ServletContextHandler handler = new ServletContextHandler();
+ handler.setSessionHandler(new SessionHandler());
+ // Bind Swagger-core to the url HOST/api-docs
+ handler.addServlet(swaggerServlet, "/api-docs/*");
+
+ // And we are done
+ return handler;
+ }
+
+ private static WebAppContext setupWebAppContext(
+ ZeppelinConfiguration conf) {
+
+ WebAppContext webApp = new WebAppContext();
+ File warPath = new File(conf.getString(ConfVars.ZEPPELIN_WAR));
+ if (warPath.isDirectory()) {
+ // Development mode, read from FS
+ // webApp.setDescriptor(warPath+"/WEB-INF/web.xml");
+ webApp.setResourceBase(warPath.getPath());
+ webApp.setContextPath("/");
+ webApp.setParentLoaderPriority(true);
+ } else {
+ // use packaged WAR
+ webApp.setWar(warPath.getAbsolutePath());
+ }
+ // Explicit bind to root
+ webApp.addServlet(
+ new ServletHolder(new AppScriptServlet(conf.getWebSocketPort())),
+ "/*"
+ );
+ return webApp;
+ }
+
+ /**
+ * Handles the WebApplication for Swagger-ui.
+ *
+ * @return WebAppContext with swagger ui context
+ */
+ /*private static WebAppContext setupWebAppSwagger(
+ ZeppelinConfiguration conf) {
+
+ WebAppContext webApp = new WebAppContext();
+ File warPath = new File(conf.getString(ConfVars.ZEPPELIN_API_WAR));
+
+ if (warPath.isDirectory()) {
+ webApp.setResourceBase(warPath.getPath());
+ } else {
+ webApp.setWar(warPath.getAbsolutePath());
+ }
+ webApp.setContextPath("/docs");
+ webApp.setParentLoaderPriority(true);
+ // Bind swagger-ui to the path HOST/docs
+ webApp.addServlet(new ServletHolder(new DefaultServlet()), "/docs/*");
+ return webApp;
+ }*/
+
+ public ZeppelinServer() throws Exception {
+ ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+
+ this.schedulerFactory = new SchedulerFactory();
+
+ this.replFactory = new InterpreterFactory(conf);
+ notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer);
+ }
+
+ @Override
+ public Set<Class<?>> getClasses() {
+ Set<Class<?>> classes = new HashSet<Class<?>>();
+ return classes;
+ }
+
+ @Override
+ public java.util.Set<java.lang.Object> getSingletons() {
+ Set<Object> singletons = new HashSet<Object>();
+
+ /** Rest-api root endpoint */
+ ZeppelinRestApi root = new ZeppelinRestApi();
+ singletons.add(root);
+
+ NotebookRestApi notebookApi = new NotebookRestApi(notebook);
+ singletons.add(notebookApi);
+
+ InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);
+ singletons.add(interpreterApi);
+
+ return singletons;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
new file mode 100644
index 0000000..a7b8b66
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/Message.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.socket;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Zeppelin websocker massage template class.
+ *
+ * @author Leemoonsoo
+ *
+ */
+public class Message {
+ /**
+ * Representation of event type.
+ *
+ * @author Leemoonsoo
+ *
+ */
+ public static enum OP {
+ GET_NOTE, // [c-s] client load note
+ // @param id note id
+
+ NOTE, // [s-c] note info
+ // @param note serlialized Note object
+
+ PARAGRAPH, // [s-c] paragraph info
+ // @param paragraph serialized paragraph object
+
+ PROGRESS, // [s-c] progress update
+ // @param id paragraph id
+ // @param progress percentage progress
+
+ NEW_NOTE, // [c-s] create new notebook
+ DEL_NOTE, // [c-s] delete notebook
+ // @param id note id
+ NOTE_UPDATE,
+
+ RUN_PARAGRAPH, // [c-s] run paragraph
+ // @param id paragraph id
+ // @param paragraph paragraph content.ie. script
+ // @param config paragraph config
+ // @param params paragraph params
+
+ COMMIT_PARAGRAPH, // [c-s] commit paragraph
+ // @param id paragraph id
+ // @param title paragraph title
+ // @param paragraph paragraph content.ie. script
+ // @param config paragraph config
+ // @param params paragraph params
+
+ CANCEL_PARAGRAPH, // [c-s] cancel paragraph run
+ // @param id paragraph id
+
+ MOVE_PARAGRAPH, // [c-s] move paragraph order
+ // @param id paragraph id
+ // @param index index the paragraph want to go
+
+ INSERT_PARAGRAPH, // [c-s] create new paragraph below current paragraph
+ // @param target index
+
+ COMPLETION, // [c-s] ask completion candidates
+ // @param id
+ // @param buf current code
+ // @param cursor cursor position in code
+
+ COMPLETION_LIST, // [s-c] send back completion candidates list
+ // @param id
+ // @param completions list of string
+
+ LIST_NOTES, // [c-s] ask list of note
+
+ NOTES_INFO, // [s-c] list of note infos
+ // @param notes serialized List<NoteInfo> object
+
+ PARAGRAPH_REMOVE,
+ }
+
+ public OP op;
+ public Map<String, Object> data = new HashMap<String, Object>();
+
+ public Message(OP op) {
+ this.op = op;
+ }
+
+ public Message put(String k, Object v) {
+ data.put(k, v);
+ return this;
+ }
+
+ public Object get(String k) {
+ return data.get(k);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
new file mode 100644
index 0000000..db5733e
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.socket;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.notebook.JobListenerFactory;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.JobListener;
+import org.apache.zeppelin.scheduler.Job.Status;
+import org.apache.zeppelin.server.ZeppelinServer;
+import org.apache.zeppelin.socket.Message.OP;
+import org.java_websocket.WebSocket;
+import org.java_websocket.handshake.ClientHandshake;
+import org.java_websocket.server.WebSocketServer;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+
+/**
+ * Zeppelin websocket service.
+ *
+ * @author anthonycorbacho
+ */
+public class NotebookServer extends WebSocketServer implements JobListenerFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
+ private static final int DEFAULT_PORT = 8282;
+
+ private static void creatingwebSocketServerLog(int port) {
+ LOG.info("Create zeppelin websocket on port {}", port);
+ }
+
+ Gson gson = new Gson();
+ Map<String, List<WebSocket>> noteSocketMap = new HashMap<String, List<WebSocket>>();
+ List<WebSocket> connectedSockets = new LinkedList<WebSocket>();
+
+ public NotebookServer() {
+ super(new InetSocketAddress(DEFAULT_PORT));
+ creatingwebSocketServerLog(DEFAULT_PORT);
+ }
+
+ public NotebookServer(int port) {
+ super(new InetSocketAddress(port));
+ creatingwebSocketServerLog(port);
+ }
+
+ private Notebook notebook() {
+ return ZeppelinServer.notebook;
+ }
+
+ @Override
+ public void onOpen(WebSocket conn, ClientHandshake handshake) {
+ LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn
+ .getRemoteSocketAddress().getPort());
+ synchronized (connectedSockets) {
+ connectedSockets.add(conn);
+ }
+ }
+
+ @Override
+ public void onMessage(WebSocket conn, String msg) {
+ Notebook notebook = notebook();
+ try {
+ Message messagereceived = deserializeMessage(msg);
+ LOG.info("RECEIVE << " + messagereceived.op);
+ /** Lets be elegant here */
+ switch (messagereceived.op) {
+ case LIST_NOTES:
+ broadcastNoteList();
+ break;
+ case GET_NOTE:
+ sendNote(conn, notebook, messagereceived);
+ break;
+ case NEW_NOTE:
+ createNote(conn, notebook);
+ break;
+ case DEL_NOTE:
+ removeNote(conn, notebook, messagereceived);
+ break;
+ case COMMIT_PARAGRAPH:
+ updateParagraph(conn, notebook, messagereceived);
+ break;
+ case RUN_PARAGRAPH:
+ runParagraph(conn, notebook, messagereceived);
+ break;
+ case CANCEL_PARAGRAPH:
+ cancelParagraph(conn, notebook, messagereceived);
+ break;
+ case MOVE_PARAGRAPH:
+ moveParagraph(conn, notebook, messagereceived);
+ break;
+ case INSERT_PARAGRAPH:
+ insertParagraph(conn, notebook, messagereceived);
+ break;
+ case PARAGRAPH_REMOVE:
+ removeParagraph(conn, notebook, messagereceived);
+ break;
+ case NOTE_UPDATE:
+ updateNote(conn, notebook, messagereceived);
+ break;
+ case COMPLETION:
+ completion(conn, notebook, messagereceived);
+ break;
+ default:
+ broadcastNoteList();
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error("Can't handle message", e);
+ }
+ }
+
+ @Override
+ public void onClose(WebSocket conn, int code, String reason, boolean remote) {
+ LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn
+ .getRemoteSocketAddress().getPort());
+ removeConnectionFromAllNote(conn);
+ synchronized (connectedSockets) {
+ connectedSockets.remove(conn);
+ }
+ }
+
+ @Override
+ public void onError(WebSocket conn, Exception message) {
+ removeConnectionFromAllNote(conn);
+ synchronized (connectedSockets) {
+ connectedSockets.remove(conn);
+ }
+ }
+
+ private Message deserializeMessage(String msg) {
+ Message m = gson.fromJson(msg, Message.class);
+ return m;
+ }
+
+ private String serializeMessage(Message m) {
+ return gson.toJson(m);
+ }
+
+ private void addConnectionToNote(String noteId, WebSocket socket) {
+ synchronized (noteSocketMap) {
+ removeConnectionFromAllNote(socket); // make sure a socket relates only a single note.
+ List<WebSocket> socketList = noteSocketMap.get(noteId);
+ if (socketList == null) {
+ socketList = new LinkedList<WebSocket>();
+ noteSocketMap.put(noteId, socketList);
+ }
+
+ if (socketList.contains(socket) == false) {
+ socketList.add(socket);
+ }
+ }
+ }
+
+ private void removeConnectionFromNote(String noteId, WebSocket socket) {
+ synchronized (noteSocketMap) {
+ List<WebSocket> socketList = noteSocketMap.get(noteId);
+ if (socketList != null) {
+ socketList.remove(socket);
+ }
+ }
+ }
+
+ private void removeNote(String noteId) {
+ synchronized (noteSocketMap) {
+ List<WebSocket> socketList = noteSocketMap.remove(noteId);
+ }
+ }
+
+ private void removeConnectionFromAllNote(WebSocket socket) {
+ synchronized (noteSocketMap) {
+ Set<String> keys = noteSocketMap.keySet();
+ for (String noteId : keys) {
+ removeConnectionFromNote(noteId, socket);
+ }
+ }
+ }
+
+ private String getOpenNoteId(WebSocket socket) {
+ String id = null;
+ synchronized (noteSocketMap) {
+ Set<String> keys = noteSocketMap.keySet();
+ for (String noteId : keys) {
+ List<WebSocket> sockets = noteSocketMap.get(noteId);
+ if (sockets.contains(socket)) {
+ id = noteId;
+ }
+ }
+ }
+ return id;
+ }
+
+ private void broadcast(String noteId, Message m) {
+ LOG.info("SEND >> " + m.op);
+ synchronized (noteSocketMap) {
+ List<WebSocket> socketLists = noteSocketMap.get(noteId);
+ if (socketLists == null || socketLists.size() == 0) {
+ return;
+ }
+ for (WebSocket conn : socketLists) {
+ conn.send(serializeMessage(m));
+ }
+ }
+ }
+
+ private void broadcastAll(Message m) {
+ synchronized (connectedSockets) {
+ for (WebSocket conn : connectedSockets) {
+ conn.send(serializeMessage(m));
+ }
+ }
+ }
+
+ private void broadcastNote(Note note) {
+ broadcast(note.id(), new Message(OP.NOTE).put("note", note));
+ }
+
+ private void broadcastNoteList() {
+ Notebook notebook = notebook();
+ List<Note> notes = notebook.getAllNotes();
+ List<Map<String, String>> notesInfo = new LinkedList<Map<String, String>>();
+ for (Note note : notes) {
+ Map<String, String> info = new HashMap<String, String>();
+ info.put("id", note.id());
+ info.put("name", note.getName());
+ notesInfo.add(info);
+ }
+ broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo));
+ }
+
+ private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) {
+ String noteId = (String) fromMessage.get("id");
+ if (noteId == null) {
+ return;
+ }
+ Note note = notebook.getNote(noteId);
+ if (note != null) {
+ addConnectionToNote(note.id(), conn);
+ conn.send(serializeMessage(new Message(OP.NOTE).put("note", note)));
+ }
+ }
+
+ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws SchedulerException, IOException {
+ String noteId = (String) fromMessage.get("id");
+ String name = (String) fromMessage.get("name");
+ Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+ if (noteId == null) {
+ return;
+ }
+ if (config == null) {
+ return;
+ }
+ Note note = notebook.getNote(noteId);
+ if (note != null) {
+ boolean cronUpdated = isCronUpdated(config, note.getConfig());
+ note.setName(name);
+ note.setConfig(config);
+
+ if (cronUpdated) {
+ notebook.refreshCron(note.id());
+ }
+ note.persist();
+
+ broadcastNote(note);
+ broadcastNoteList();
+ }
+ }
+
+ private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) {
+ boolean cronUpdated = false;
+ if (configA.get("cron") != null && configB.get("cron") != null
+ && configA.get("cron").equals(configB.get("cron"))) {
+ cronUpdated = true;
+ } else if (configA.get("cron") == null && configB.get("cron") == null) {
+ cronUpdated = false;
+ } else if (configA.get("cron") != null || configB.get("cron") != null) {
+ cronUpdated = true;
+ }
+ return cronUpdated;
+ }
+
+ private void createNote(WebSocket conn, Notebook notebook) throws IOException {
+ Note note = notebook.createNote();
+ note.addParagraph(); // it's an empty note. so add one paragraph
+ note.persist();
+ broadcastNote(note);
+ broadcastNoteList();
+ }
+
+ private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws IOException {
+ String noteId = (String) fromMessage.get("id");
+ if (noteId == null) {
+ return;
+ }
+ Note note = notebook.getNote(noteId);
+ note.unpersist();
+ notebook.removeNote(noteId);
+ removeNote(noteId);
+ broadcastNoteList();
+ }
+
+ private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws IOException {
+ String paragraphId = (String) fromMessage.get("id");
+ if (paragraphId == null) {
+ return;
+ }
+ Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
+ Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+ final Note note = notebook.getNote(getOpenNoteId(conn));
+ Paragraph p = note.getParagraph(paragraphId);
+ p.settings.setParams(params);
+ p.setConfig(config);
+ p.setTitle((String) fromMessage.get("title"));
+ p.setText((String) fromMessage.get("paragraph"));
+ note.persist();
+ broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p));
+ }
+
+ private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws IOException {
+ final String paragraphId = (String) fromMessage.get("id");
+ if (paragraphId == null) {
+ return;
+ }
+ final Note note = notebook.getNote(getOpenNoteId(conn));
+ /** We dont want to remove the last paragraph */
+ if (!note.isLastParagraph(paragraphId)) {
+ note.removeParagraph(paragraphId);
+ note.persist();
+ broadcastNote(note);
+ }
+ }
+
+ private void completion(WebSocket conn, Notebook notebook, Message fromMessage) {
+ String paragraphId = (String) fromMessage.get("id");
+ String buffer = (String) fromMessage.get("buf");
+ int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString());
+ Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId);
+
+ if (paragraphId == null) {
+ conn.send(serializeMessage(resp));
+ return;
+ }
+
+ final Note note = notebook.getNote(getOpenNoteId(conn));
+ List<String> candidates = note.completion(paragraphId, buffer, cursor);
+ resp.put("completions", candidates);
+ conn.send(serializeMessage(resp));
+ }
+
+ private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws IOException {
+ final String paragraphId = (String) fromMessage.get("id");
+ if (paragraphId == null) {
+ return;
+ }
+
+ final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString());
+ final Note note = notebook.getNote(getOpenNoteId(conn));
+ note.moveParagraph(paragraphId, newIndex);
+ note.persist();
+ broadcastNote(note);
+ }
+
+ private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws IOException {
+ final int index = (int) Double.parseDouble(fromMessage.get("index").toString());
+
+ final Note note = notebook.getNote(getOpenNoteId(conn));
+ note.insertParagraph(index);
+ note.persist();
+ broadcastNote(note);
+ }
+
+
+ private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws IOException {
+ final String paragraphId = (String) fromMessage.get("id");
+ if (paragraphId == null) {
+ return;
+ }
+
+ final Note note = notebook.getNote(getOpenNoteId(conn));
+ Paragraph p = note.getParagraph(paragraphId);
+ p.abort();
+ }
+
+ private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
+ throws IOException {
+ final String paragraphId = (String) fromMessage.get("id");
+ if (paragraphId == null) {
+ return;
+ }
+ final Note note = notebook.getNote(getOpenNoteId(conn));
+ Paragraph p = note.getParagraph(paragraphId);
+ String text = (String) fromMessage.get("paragraph");
+ p.setText(text);
+ p.setTitle((String) fromMessage.get("title"));
+ Map<String, Object> params = (Map<String, Object>) fromMessage.get("params");
+ p.settings.setParams(params);
+ Map<String, Object> config = (Map<String, Object>) fromMessage.get("config");
+ p.setConfig(config);
+
+ // if it's the last paragraph, let's add a new one
+ boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId());
+ if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) {
+ note.addParagraph();
+ }
+ note.persist();
+ broadcastNote(note);
+
+ try {
+ note.run(paragraphId);
+ }
+ catch (Exception ex) {
+ LOG.error("Exception from run", ex);
+ if (p != null) {
+ p.setReturn(new InterpreterResult(
+ InterpreterResult.Code.ERROR, ex.getMessage()), ex);
+ p.setStatus(Status.ERROR);
+ }
+ }
+ }
+
+ /**
+ * Need description here.
+ *
+ */
+ public static class ParagraphJobListener implements JobListener {
+ private NotebookServer notebookServer;
+ private Note note;
+
+ public ParagraphJobListener(NotebookServer notebookServer, Note note) {
+ this.notebookServer = notebookServer;
+ this.note = note;
+ }
+
+ @Override
+ public void onProgressUpdate(Job job, int progress) {
+ notebookServer.broadcast(note.id(),
+ new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress()));
+ }
+
+ @Override
+ public void beforeStatusChange(Job job, Status before, Status after) {}
+
+ @Override
+ public void afterStatusChange(Job job, Status before, Status after) {
+ if (after == Status.ERROR) {
+ job.getException().printStackTrace();
+ }
+ if (job.isTerminated()) {
+ LOG.info("Job {} is finished", job.getId());
+ try {
+ note.persist();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ notebookServer.broadcastNote(note);
+ }
+ }
+
+ @Override
+ public JobListener getParagraphJobListener(Note note) {
+ return new ParagraphJobListener(this, note);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/669d408d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java
new file mode 100644
index 0000000..f44dc1f
--- /dev/null
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.socket;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.java_websocket.SSLSocketChannel2;
+import org.java_websocket.server.DefaultSSLWebSocketServerFactory;
+
+/**
+ * Extension of the java_websocket library's DefaultSslWebSocketServerFactory
+ * to require client side authentication during the SSL handshake
+ */
+public class SslWebSocketServerFactory
+ extends DefaultSSLWebSocketServerFactory {
+
+ protected boolean needClientAuth;
+
+ public SslWebSocketServerFactory(SSLContext sslcontext) {
+ super(sslcontext);
+ initAttributes();
+ }
+
+ public SslWebSocketServerFactory(
+ SSLContext sslcontext,
+ ExecutorService exec) {
+
+ super(sslcontext, exec);
+ initAttributes();
+ }
+
+ protected void initAttributes() {
+ this.needClientAuth = false;
+ }
+
+ @Override
+ public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key)
+ throws IOException {
+
+ SSLEngine sslEngine = sslcontext.createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ sslEngine.setNeedClientAuth(needClientAuth);
+ return new SSLSocketChannel2( channel, sslEngine, exec, key );
+ }
+
+ public boolean getNeedClientAuth() {
+ return needClientAuth;
+ }
+
+ public void setNeedClientAuth(boolean needClientAuth) {
+ this.needClientAuth = needClientAuth;
+ }
+}
+