You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by li...@apache.org on 2019/06/24 11:01:27 UTC

[zeppelin] branch master updated: [ZEPPELIN-3778] Cluster synchronize notes & authorization

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0845d35  [ZEPPELIN-3778] Cluster synchronize notes & authorization
0845d35 is described below

commit 0845d353c06ef12d06c2cfd586150e1cc3b887f3
Author: Xun Liu <li...@apache.org>
AuthorDate: Sun Jun 23 22:18:42 2019 +0800

    [ZEPPELIN-3778] Cluster synchronize notes & authorization
    
    ### What is this PR for?
    In cluster mode, The user creates, modifies, and deletes the note on any of the zeppelin servers.
    All need to be notified to all the zeppelin servers in the cluster to synchronize the update of Notebook. Failure to do so will result in the user not being able to continue while switching to another server.
    
    1. Listen for note create/delete/update events
    2. Listen for note Notebook Authorization events
    3. Broadcast note update event
    
    ### What type of PR is it?
    [Feature]
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-3778
    
    ### How should this be tested?
    * [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/546647637)
    
    ### Screenshots (if appropriate)
    ### Sync note event
    ![syncNote](https://user-images.githubusercontent.com/3677382/59598792-9a50fc80-912f-11e9-940c-c983dff1f373.gif)
    
    ### Sync note Authorization
    ![sync-auth](https://user-images.githubusercontent.com/3677382/59598816-a5a42800-912f-11e9-93fd-9fba426ea7d6.gif)
    
    ### Questions:
    * Does the licenses files need update?
    * Is there breaking changes for older versions?
    * Does this needs documentation?
    
    Author: Xun Liu <li...@apache.org>
    
    Closes #3387 from liuxunorg/ZEPPELIN-3778 and squashes the following commits:
    
    437d108ff [Xun Liu] Organize the code format
    05b75ad12 [Xun Liu] [ZEPPELIN-3778] Cluster synchronize notes & authorization
---
 .../zeppelin/cluster/ClusterManagerServer.java     |  78 ++--
 .../zeppelin/cluster/event/ClusterEvent.java       |  18 +-
 .../zeppelin/cluster/event/ClusterMessage.java     |  59 +++
 .../zeppelin/conf/ZeppelinConfiguration.java       |   6 +
 .../launcher/ClusterInterpreterLauncher.java       |  15 +-
 .../org/apache/zeppelin/server/ZeppelinServer.java |   7 +-
 .../org/apache/zeppelin/socket/NotebookServer.java | 164 +++++++-
 .../cluster/ClusterAuthEventListenerTest.java      |  70 ++++
 .../apache/zeppelin/cluster/ClusterEventTest.java  | 440 +++++++++++++++++++++
 .../cluster/ClusterNoteAuthEventListenerTest.java  |  50 +++
 .../cluster/ClusterNoteEventListenerTest.java      |  79 ++++
 .../zeppelin/cluster/ZeppelinServerMock.java       | 366 +++++++++++++++++
 .../zeppelin/notebook/AuthorizationService.java    |  98 ++++-
 .../zeppelin/notebook/NotebookAuthorization.java   |  48 ++-
 14 files changed, 1450 insertions(+), 48 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index eb64393..e8434fb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -35,19 +35,10 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.cluster.event.ClusterEventListener;
 import org.apache.zeppelin.cluster.meta.ClusterMeta;
 import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -76,9 +67,15 @@ public class ClusterManagerServer extends ClusterManager {
   // Connect to the interpreter process that has been created
   public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS";
 
-  private List<ClusterEventListener> clusterEventListeners = new ArrayList<>();
+  private List<ClusterEventListener> clusterIntpEventListeners = new ArrayList<>();
+  private List<ClusterEventListener> clusterNoteEventListeners = new ArrayList<>();
+  private List<ClusterEventListener> clusterAuthEventListeners = new ArrayList<>();
+
   // zeppelin cluster event
-  public static String ZEPL_CLUSTER_EVENT_TOPIC = "ZEPL_CLUSTER_EVENT_TOPIC";
+  public static String CLUSTER_INTP_EVENT_TOPIC = "CLUSTER_INTP_EVENT_TOPIC";
+  public static String CLUSTER_NOTE_EVENT_TOPIC = "CLUSTER_NOTE_EVENT_TOPIC";
+  public static String CLUSTER_AUTH_EVENT_TOPIC = "CLUSTER_AUTH_EVENT_TOPIC";
+  public static String CLUSTER_NB_AUTH_EVENT_TOPIC = "CLUSTER_NB_AUTH_EVENT_TOPIC";
 
   private ClusterManagerServer() {
     super();
@@ -206,8 +203,12 @@ public class ClusterManagerServer extends ClusterManager {
         raftServer = builder.build();
         raftServer.bootstrap(clusterMemberIds);
 
-        messagingService.registerHandler(ZEPL_CLUSTER_EVENT_TOPIC,
-            subscribeClusterEvent, MoreExecutors.directExecutor());
+        messagingService.registerHandler(CLUSTER_INTP_EVENT_TOPIC,
+            subscribeClusterIntpEvent, MoreExecutors.directExecutor());
+        messagingService.registerHandler(CLUSTER_NOTE_EVENT_TOPIC,
+            subscribeClusterNoteEvent, MoreExecutors.directExecutor());
+        messagingService.registerHandler(CLUSTER_AUTH_EVENT_TOPIC,
+            subscribeClusterAuthEvent, MoreExecutors.directExecutor());
 
         LOGGER.info("RaftServer run() <<<");
       }
@@ -273,12 +274,12 @@ public class ClusterManagerServer extends ClusterManager {
     return idleNodeMeta;
   }
 
-  public void unicastClusterEvent(String host, int port,  String msg) {
+  public void unicastClusterEvent(String host, int port, String topic, String msg) {
     LOGGER.info("send unicastClusterEvent message {}", msg);
 
     Address address = Address.from(host, port);
     CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
-        ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
+        topic, msg.getBytes(), Duration.ofSeconds(2));
     response.whenComplete((r, e) -> {
       if (null == e) {
         LOGGER.error(e.getMessage(), e);
@@ -288,7 +289,7 @@ public class ClusterManagerServer extends ClusterManager {
     });
   }
 
-  public void broadcastClusterEvent(String msg) {
+  public void broadcastClusterEvent(String topic, String msg) {
     LOGGER.info("send broadcastClusterEvent message {}", msg);
 
     for (Node node : clusterNodes) {
@@ -299,7 +300,7 @@ public class ClusterManagerServer extends ClusterManager {
       }
 
       CompletableFuture<byte[]> response = messagingService.sendAndReceive(node.address(),
-          ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
+          topic, msg.getBytes(), Duration.ofSeconds(2));
       response.whenComplete((r, e) -> {
         if (null == e) {
           LOGGER.error(e.getMessage(), e);
@@ -310,18 +311,51 @@ public class ClusterManagerServer extends ClusterManager {
     }
   }
 
-  private BiFunction<Address, byte[], byte[]> subscribeClusterEvent = (address, data) -> {
+  private BiFunction<Address, byte[], byte[]> subscribeClusterIntpEvent = (address, data) -> {
+    String message = new String(data);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("subscribeClusterIntpEvent() {}", message);
+    }
+    for (ClusterEventListener eventListener : clusterIntpEventListeners) {
+      eventListener.onClusterEvent(message);
+    }
+
+    return null;
+  };
+
+  private BiFunction<Address, byte[], byte[]> subscribeClusterNoteEvent = (address, data) -> {
     String message = new String(data);
-    LOGGER.info("subscribeClusterEvent() {}", message);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("subscribeClusterNoteEvent() {}", message);
+    }
+    for (ClusterEventListener eventListener : clusterNoteEventListeners) {
+      eventListener.onClusterEvent(message);
+    }
 
-    for (ClusterEventListener eventListener : clusterEventListeners) {
+    return null;
+  };
+
+  private BiFunction<Address, byte[], byte[]> subscribeClusterAuthEvent = (address, data) -> {
+    String message = new String(data);
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("subscribeClusterAuthEvent() {}", message);
+    }
+    for (ClusterEventListener eventListener : clusterAuthEventListeners) {
       eventListener.onClusterEvent(message);
     }
 
     return null;
   };
 
-  public void addClusterEventListeners(ClusterEventListener listener) {
-    clusterEventListeners.add(listener);
+  public void addClusterEventListeners(String topic, ClusterEventListener listener) {
+    if (StringUtils.equals(topic, CLUSTER_INTP_EVENT_TOPIC)) {
+      clusterIntpEventListeners.add(listener);
+    } else if (StringUtils.equals(topic, CLUSTER_NOTE_EVENT_TOPIC)) {
+      clusterNoteEventListeners.add(listener);
+    } else if (StringUtils.equals(topic, CLUSTER_AUTH_EVENT_TOPIC)) {
+      clusterAuthEventListeners.add(listener);
+    } else {
+      LOGGER.error("Unknow cluster event topic : {}", topic);
+    }
   }
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
index 0e1120c..4fb61da 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
@@ -20,5 +20,21 @@ package org.apache.zeppelin.cluster.event;
  * Cluster Event
  */
 public enum ClusterEvent {
-  CREATE_INTP_PROCESS
+  // CLUSTER_INTP_EVENT_TOPIC
+  CREATE_INTP_PROCESS,
+  // CLUSTER_NOTE_EVENT_TOPIC
+  BROADCAST_NOTE,
+  BROADCAST_NOTE_LIST,
+  BROADCAST_PARAGRAPH,
+  BROADCAST_PARAGRAPHS,
+  BROADCAST_NEW_PARAGRAPH,
+  UPDATE_NOTE_PERMISSIONS,
+  // CLUSTER_AUTH_EVENT_TOPIC
+  SET_ROLES,
+  SET_READERS_PERMISSIONS,
+  SET_RUNNERS_PERMISSIONS,
+  SET_WRITERS_PERMISSIONS,
+  SET_OWNERS_PERMISSIONS,
+  CLEAR_PERMISSION,
+  SET_NEW_NOTE_PERMISSIONS
 }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
new file mode 100644
index 0000000..1fa6938
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster.event;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.zeppelin.display.Input;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ClusterMessage {
+  public ClusterEvent clusterEvent;
+  private Map<String, String> data = new HashMap<>();
+
+  private static Gson gson = new GsonBuilder()
+      .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
+      .setPrettyPrinting()
+      .registerTypeAdapterFactory(Input.TypeAdapterFactory).create();
+
+  public ClusterMessage(ClusterEvent event) {
+    this.clusterEvent = event;
+  }
+
+  public ClusterMessage put(String k, String v) {
+    data.put(k, v);
+    return this;
+  }
+
+  public String get(String k) {
+    return data.get(k);
+  }
+
+  public Map<String, String> getData() {
+    return data;
+  }
+
+  public static ClusterMessage deserializeMessage(String msg) {
+    return gson.fromJson(msg, ClusterMessage.class);
+  }
+
+  public static String serializeMessage(ClusterMessage m) {
+    return gson.toJson(m);
+  }
+}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index e16ade8..2818313 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.configuration.tree.ConfigurationNode;
@@ -312,6 +313,11 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     return getString(ConfVars.ZEPPELIN_ADDR);
   }
 
+  @VisibleForTesting
+  public void setServerPort(int port) {
+    properties.put(ConfVars.ZEPPELIN_PORT.getVarName(), String.valueOf(port));
+  }
+
   public int getServerPort() {
     return getInt(ConfVars.ZEPPELIN_PORT);
   }
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 7d8ff1e..1fe77a03 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -56,7 +56,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
   public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage)
       throws IOException {
     super(zConf, recoveryStorage);
-    clusterServer.addClusterEventListeners(this);
+    clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, this);
   }
 
   @Override
@@ -104,7 +104,8 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
         mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS);
         mapEvent.put(CLUSTER_EVENT_MSG, sContext);
         String strEvent = gson.toJson(mapEvent);
-        clusterServer.unicastClusterEvent(srvHost, srvPort, strEvent);
+        clusterServer.unicastClusterEvent(
+            srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, strEvent);
 
         HashMap<String, Object> intpMeta = clusterServer
             .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
@@ -145,9 +146,13 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
   }
 
   @Override
-  public void onClusterEvent(String event) {
+  public void onClusterEvent(String msg) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(msg);
+    }
+
     Gson gson = new Gson();
-    Map<String, Object> mapEvent = gson.fromJson(event,
+    Map<String, Object> mapEvent = gson.fromJson(msg,
         new TypeToken<Map<String, Object>>(){}.getType());
     String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
     ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
@@ -157,7 +162,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
         onCreateIntpProcess(mapEvent);
         break;
       default:
-        LOGGER.error("Unknown Cluster Event : {}", clusterEvent);
+        LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
         break;
     }
   }
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 1e180d8..10ee180 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -353,7 +353,12 @@ public class ZeppelinServer extends ResourceConfig {
 
   private static void setupClusterManagerServer(ServiceLocator serviceLocator) {
     if (conf.isClusterMode()) {
-      ClusterManagerServer.getInstance().start();
+      ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance();
+      NotebookServer notebookServer = serviceLocator.getService(NotebookServer.class);
+      AuthorizationService authorizationService = serviceLocator.getService(AuthorizationService.class);
+      clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer);
+      clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService);
+      clusterManagerServer.start();
     }
   }
 
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index bc482df..20527b9 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -42,6 +42,10 @@ import javax.servlet.http.HttpServletRequest;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.cluster.event.ClusterEvent;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.cluster.event.ClusterMessage;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -104,7 +108,8 @@ public class NotebookServer extends WebSocketServlet
         RemoteInterpreterProcessListener,
         ApplicationEventListener,
         ParagraphJobListener,
-        NoteEventListener {
+        NoteEventListener,
+        ClusterEventListener {
 
   /**
    * Job manager service type.
@@ -525,50 +530,177 @@ public class NotebookServer extends WebSocketServlet
   }
 
   public void broadcastNote(Note note) {
-    connectionManager.broadcast(note.getId(), new Message(OP.NOTE).put("note", note));
+    inlineBroadcastNote(note);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, note);
   }
 
-  public void broadcastParagraph(Note note, Paragraph p) {
+  private void inlineBroadcastNote(Note note) {
+    Message message = new Message(OP.NOTE).put("note", note);
+    connectionManager.broadcast(note.getId(), message);
+  }
+
+  private void inlineBroadcastParagraph(Note note, Paragraph p) {
     broadcastNoteForms(note);
 
     if (note.isPersonalizedMode()) {
       broadcastParagraphs(p.getUserParagraphMap(), p);
     } else {
-      connectionManager.broadcast(note.getId(),
-          new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p)));
+      Message message = new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p));
+      connectionManager.broadcast(note.getId(), message);
     }
   }
 
-  public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
-                                  Paragraph defaultParagraph) {
+  public void broadcastParagraph(Note note, Paragraph p) {
+    inlineBroadcastParagraph(note, p);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, note, p);
+  }
+
+  private void inlineBroadcastParagraphs(Map<String, Paragraph> userParagraphMap,
+                                         Paragraph defaultParagraph) {
     if (null != userParagraphMap) {
       for (String user : userParagraphMap.keySet()) {
-        connectionManager.multicastToUser(user,
-            new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)));
+        Message message = new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user));
+        connectionManager.multicastToUser(user, message);
       }
     }
   }
 
-  private void broadcastNewParagraph(Note note, Paragraph para) {
+  private void broadcastParagraphs(Map<String, Paragraph> userParagraphMap,
+                                  Paragraph defaultParagraph) {
+    inlineBroadcastParagraphs(userParagraphMap, defaultParagraph);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, userParagraphMap, defaultParagraph);
+  }
+
+  private void inlineBroadcastNewParagraph(Note note, Paragraph para) {
     LOG.info("Broadcasting paragraph on run call instead of note.");
     int paraIndex = note.getParagraphs().indexOf(para);
-    connectionManager.broadcast(note.getId(),
-        new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex));
+
+    Message message = new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex);
+    connectionManager.broadcast(note.getId(), message);
   }
 
-  public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
+  private void broadcastNewParagraph(Note note, Paragraph para) {
+    inlineBroadcastNewParagraph(note, para);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, note, para);
+  }
+
+  public void inlineBroadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
     if (subject == null) {
       subject = new AuthenticationInfo(StringUtils.EMPTY);
     }
     //send first to requesting user
     List<NoteInfo> notesInfo = getNotebook().getNotesInfo(
-            noteId -> getNotebookAuthorizationService().isReader(noteId, userAndRoles));
-    connectionManager.multicastToUser(subject.getUser(),
-        new Message(OP.NOTES_INFO).put("notes", notesInfo));
+        noteId -> getNotebookAuthorizationService().isReader(noteId, userAndRoles));
+    Message message = new Message(OP.NOTES_INFO).put("notes", notesInfo);
+    connectionManager.multicastToUser(subject.getUser(), message);
     //to others afterwards
     connectionManager.broadcastNoteListExcept(notesInfo, subject);
   }
 
+  public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) {
+    inlineBroadcastNoteList(subject, userAndRoles);
+    broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, subject, userAndRoles);
+  }
+
+  // broadcast ClusterEvent
+  private void broadcastClusterEvent(ClusterEvent event, Object... objects) {
+    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+    if (!conf.isClusterMode()) {
+      return;
+    }
+
+    ClusterMessage clusterMessage = new ClusterMessage(event);
+
+    for(Object object : objects) {
+      String json = "";
+      if (object instanceof AuthenticationInfo) {
+        json = ((AuthenticationInfo) object).toJson();
+        clusterMessage.put("AuthenticationInfo", json);
+      } else if (object instanceof Note) {
+        json = ((Note) object).toJson();
+        clusterMessage.put("Note", json);
+      } else if (object instanceof Paragraph) {
+        json = ((Paragraph) object).toJson();
+        clusterMessage.put("Paragraph", json);
+      } else if (object instanceof Set) {
+        Gson gson = new Gson();
+        json = gson.toJson(object);
+        clusterMessage.put("Set<String>", json);
+      } else if (object instanceof Map) {
+        Gson gson = new Gson();
+        json = gson.toJson(object);
+        clusterMessage.put("Map<String, Paragraph>", json);
+      } else {
+        LOG.error("Unknown object type!");
+      }
+    }
+
+    String msg = ClusterMessage.serializeMessage(clusterMessage);
+    ClusterManagerServer.getInstance().broadcastClusterEvent(
+        ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, msg);
+  }
+
+  @Override
+  public void onClusterEvent(String msg) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("onClusterEvent : {}", msg);
+    }
+    ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+
+    Note note = null;
+    Paragraph paragraph = null;
+    Set<String> userAndRoles = null;
+    Map<String, Paragraph> userParagraphMap = null;
+    AuthenticationInfo authenticationInfo = null;
+    for (Map.Entry<String, String> entry : message.getData().entrySet()) {
+      String key = entry.getKey();
+      String json = entry.getValue();
+      if (StringUtils.equals(key, "AuthenticationInfo")) {
+        authenticationInfo = AuthenticationInfo.fromJson(json);
+      } else if (StringUtils.equals(key, "Note")) {
+        note = Note.fromJson(json);
+      } else if (StringUtils.equals(key, "Paragraph")) {
+        paragraph = Paragraph.fromJson(json);
+      } else if (StringUtils.equals(key, "Set<String>")) {
+        Gson gson = new Gson();
+        userAndRoles = gson.fromJson(json, new TypeToken<Set<String>>() {
+        }.getType());
+      } else if (StringUtils.equals(key, "Map<String, Paragraph>")) {
+        Gson gson = new Gson();
+        userParagraphMap = gson.fromJson(json, new TypeToken<Map<String, Paragraph>>() {
+        }.getType());
+      } else {
+        LOG.error("Unknown key:{}, json:{}!" + key, json);
+      }
+    }
+
+    switch (message.clusterEvent) {
+      case BROADCAST_NOTE:
+        inlineBroadcastNote(note);
+        break;
+      case BROADCAST_NOTE_LIST:
+        try {
+          getNotebook().reloadAllNotes(authenticationInfo);
+          inlineBroadcastNoteList(authenticationInfo, userAndRoles);
+        } catch (IOException e) {
+          LOG.error(e.getMessage(), e);
+        }
+        break;
+      case BROADCAST_PARAGRAPH:
+        inlineBroadcastParagraph(note, paragraph);
+        break;
+      case BROADCAST_PARAGRAPHS:
+        inlineBroadcastParagraphs(userParagraphMap, paragraph);
+        break;
+      case BROADCAST_NEW_PARAGRAPH:
+        inlineBroadcastNewParagraph(note, paragraph);
+        break;
+      default:
+        LOG.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg);
+        break;
+    }
+  }
+
   public void listNotesInfo(NotebookSocket conn, Message message) throws IOException {
     getNotebookService().listNotesInfo(false, getServiceContext(message),
         new WebSocketServiceCallback<List<NoteInfo>>(conn) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java
new file mode 100644
index 0000000..15bc23e
--- /dev/null
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterAuthEventListenerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.cluster.event.ClusterMessage;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class ClusterAuthEventListenerTest implements ClusterEventListener {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterAuthEventListenerTest.class);
+
+  public String receiveMsg = null;
+
+  @Override
+  public void onClusterEvent(String msg) {
+    receiveMsg = msg;
+    LOGGER.info("onClusterEvent : {}", msg);
+    ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+
+    String noteId = message.get("noteId");
+    String user = message.get("user");
+    String jsonSet = message.get("set");
+    Gson gson = new Gson();
+    Set<String> set  = gson.fromJson(jsonSet, new TypeToken<Set<String>>() {}.getType());
+
+    assertNotNull(set);
+    switch (message.clusterEvent) {
+      case SET_READERS_PERMISSIONS:
+      case SET_WRITERS_PERMISSIONS:
+      case SET_OWNERS_PERMISSIONS:
+      case SET_RUNNERS_PERMISSIONS:
+      case CLEAR_PERMISSION:
+        assertNotNull(noteId);
+        break;
+      case SET_ROLES:
+        assertNotNull(user);
+        break;
+      default:
+        receiveMsg = null;
+        fail("Unknown clusterEvent : " + message.clusterEvent);
+        break;
+    }
+  }
+}
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
new file mode 100644
index 0000000..7e18987
--- /dev/null
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterEventTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.apache.zeppelin.interpreter.thrift.ParagraphInfo;
+import org.apache.zeppelin.interpreter.thrift.ServiceException;
+import org.apache.zeppelin.notebook.AuthorizationService;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
+import org.apache.zeppelin.notebook.scheduler.SchedulerService;
+import org.apache.zeppelin.rest.message.NewParagraphRequest;
+import org.apache.zeppelin.service.ConfigurationService;
+import org.apache.zeppelin.service.NotebookService;
+import org.apache.zeppelin.socket.NotebookServer;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.apache.zeppelin.utils.TestUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class ClusterEventTest extends ZeppelinServerMock {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterEventTest.class);
+
+  private static List<ClusterAuthEventListenerTest> clusterAuthEventListenerTests = new ArrayList<>();
+  private static List<ClusterNoteEventListenerTest> clusterNoteEventListenerTests = new ArrayList<>();
+  private static List<ClusterNoteAuthEventListenerTest> clusterNoteAuthEventListenerTests = new ArrayList<>();
+
+  private static List<ClusterManagerServer> clusterServers = new ArrayList<>();
+  private static ClusterManagerClient clusterClient = null;
+  static final String metaKey = "ClusterMultiNodeTestKey";
+
+  private static Notebook notebook;
+  private static NotebookServer notebookServer;
+  private static SchedulerService schedulerService;
+  private static NotebookService notebookService;
+  private static AuthorizationService authorizationService;
+  private HttpServletRequest mockRequest;
+  private AuthenticationInfo anonymous;
+
+  Gson gson = new Gson();
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ZeppelinConfiguration zconf = genZeppelinConf();
+
+    ZeppelinServerMock.startUp(ClusterEventTest.class.getSimpleName(), zconf);
+    notebook = TestUtils.getInstance(Notebook.class);
+    authorizationService = new AuthorizationService(notebook, notebook.getConf());
+    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+    schedulerService = new QuartzSchedulerService(conf, notebook);
+    notebookServer = spy(NotebookServer.getInstance());
+    notebookService =
+        new NotebookService(notebook, authorizationService, conf, schedulerService);
+
+    ConfigurationService configurationService = new ConfigurationService(notebook.getConf());
+    when(notebookServer.getNotebookService()).thenReturn(notebookService);
+    when(notebookServer.getConfigurationService()).thenReturn(configurationService);
+
+    startOtherZeppelinClusterNode(zconf);
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    ZeppelinServerMock.shutDown();
+
+    if (null != clusterClient) {
+      clusterClient.shutdown();
+    }
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      clusterServer.shutdown();
+    }
+    LOGGER.info("stopCluster <<<");
+  }
+
+  @Before
+  public void setUp() {
+    mockRequest = mock(HttpServletRequest.class);
+    anonymous = new AuthenticationInfo("anonymous");
+  }
+
+  private static ZeppelinConfiguration genZeppelinConf()
+      throws IOException, InterruptedException {
+    String clusterAddrList = "";
+    String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
+    for (int i = 0; i < 3; i ++) {
+      // Set the cluster IP and port
+      int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+      clusterAddrList += zServerHost + ":" + zServerPort;
+      if (i != 2) {
+        clusterAddrList += ",";
+      }
+    }
+    ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+    zconf.setClusterAddress(clusterAddrList);
+    LOGGER.info("clusterAddrList = {}", clusterAddrList);
+
+    return zconf;
+  }
+
+  public static ClusterManagerServer startClusterSingleNode(String clusterAddrList, String clusterHost, int clusterPort)
+      throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+    Class clazz = ClusterManagerServer.class;
+    Constructor constructor = clazz.getDeclaredConstructor();
+    constructor.setAccessible(true);
+    ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+    clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
+
+    clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, notebookServer);
+    clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, authorizationService);
+    return clusterServer;
+  }
+
+  //
+  public static void startOtherZeppelinClusterNode(ZeppelinConfiguration zconf)
+      throws IOException, InterruptedException {
+    LOGGER.info("startCluster >>>");
+    String clusterAddrList = zconf.getClusterAddress();
+
+    // mock cluster manager server
+    String cluster[] = clusterAddrList.split(",");
+    try {
+      // NOTE: cluster[2] is ZeppelinServerMock
+      for (int i = 0; i < 2; i ++) {
+        String[] parts = cluster[i].split(":");
+        String clusterHost = parts[0];
+        int clusterPort = Integer.valueOf(parts[1]);
+
+        // ClusterSingleNodeMock clusterSingleNodeMock = new ClusterSingleNodeMock();
+        ClusterManagerServer clusterServer
+            = startClusterSingleNode(clusterAddrList, clusterHost, clusterPort);
+        clusterServers.add(clusterServer);
+        // clusterSingleNodeMockList.add(clusterSingleNodeMock);
+      }
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
+    }
+
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      ClusterAuthEventListenerTest clusterAuthEventListenerTest = new ClusterAuthEventListenerTest();
+      clusterAuthEventListenerTests.add(clusterAuthEventListenerTest);
+      clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, clusterAuthEventListenerTest);
+
+      ClusterNoteEventListenerTest clusterNoteEventListenerTest = new ClusterNoteEventListenerTest();
+      clusterNoteEventListenerTests.add(clusterNoteEventListenerTest);
+      clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC, clusterNoteEventListenerTest);
+
+      ClusterNoteAuthEventListenerTest clusterNoteAuthEventListenerTest = new ClusterNoteAuthEventListenerTest();
+      clusterNoteAuthEventListenerTests.add(clusterNoteAuthEventListenerTest);
+      clusterServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NB_AUTH_EVENT_TOPIC, clusterNoteAuthEventListenerTest);
+
+      clusterServer.start();
+    }
+
+    // mock cluster manager client
+    clusterClient = ClusterManagerClient.getInstance();
+    clusterClient.start(metaKey);
+
+    // Waiting for cluster startup
+    int wait = 0;
+    while(wait++ < 100) {
+      if (clusterIsStartup() && clusterClient.raftInitialized()) {
+        LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
+        break;
+      }
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        LOGGER.error(e.getMessage(), e);
+      }
+    }
+
+    Thread.sleep(3000);
+    assertEquals(true, clusterIsStartup());
+
+    getClusterServerMeta();
+    LOGGER.info("startCluster <<<");
+  }
+
+  private void checkClusterNoteEventListener() {
+    for (ClusterNoteEventListenerTest clusterNoteEventListenerTest : clusterNoteEventListenerTests) {
+      assertNotNull(clusterNoteEventListenerTest.receiveMsg);
+    }
+  }
+
+  private void checkClusterAuthEventListener() {
+    for (ClusterAuthEventListenerTest clusterAuthEventListenerTest : clusterAuthEventListenerTests) {
+      assertNotNull(clusterAuthEventListenerTest.receiveMsg);
+    }
+  }
+
+  private void checkClusterNoteAuthEventListener() {
+    for (ClusterNoteAuthEventListenerTest clusterNoteAuthEventListenerTest : clusterNoteAuthEventListenerTests) {
+      assertNotNull(clusterNoteAuthEventListenerTest.receiveMsg);
+    }
+  }
+
+  static boolean clusterIsStartup() {
+    boolean foundLeader = false;
+    for (ClusterManagerServer clusterServer : clusterServers) {
+      if (!clusterServer.raftInitialized()) {
+        LOGGER.warn("clusterServer not Initialized!");
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  public static void getClusterServerMeta() {
+    LOGGER.info("getClusterServerMeta >>>");
+    // Get metadata for all services
+    Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+    LOGGER.info(srvMeta.toString());
+
+    Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+    LOGGER.info(intpMeta.toString());
+
+    assertNotNull(srvMeta);
+    assertEquals(true, (srvMeta instanceof HashMap));
+    HashMap hashMap = (HashMap) srvMeta;
+
+    assertEquals(hashMap.size(), 3);
+    LOGGER.info("getClusterServerMeta <<<");
+  }
+
+  @Test
+  public void testRenameNoteEvent() throws IOException {
+    Note note = null;
+    try {
+      String oldName = "old_name";
+      note = TestUtils.getInstance(Notebook.class).createNote(oldName, anonymous);
+      assertEquals(note.getName(), oldName);
+      String noteId = note.getId();
+
+      final String newName = "testName";
+      String jsonRequest = "{\"name\": " + newName + "}";
+
+      PutMethod put = httpPut("/notebook/" + noteId + "/rename/", jsonRequest);
+      assertThat("test testRenameNote:", put, isAllowed());
+      put.releaseConnection();
+
+      assertEquals(note.getName(), newName);
+
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterNoteEventListener();
+    } catch (InterruptedException e) {
+      LOGGER.error(e.getMessage(), e);
+    } finally {
+      // cleanup
+      if (null != note) {
+        TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous);
+      }
+    }
+  }
+  @Test
+  public void testCloneNoteEvent() throws IOException {
+    Note note1 = null;
+    String clonedNoteId = null;
+    try {
+      note1 = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
+      PostMethod post = httpPost("/notebook/" + note1.getId(), "");
+      LOG.info("testCloneNote response\n" + post.getResponseBodyAsString());
+      assertThat(post, isAllowed());
+      Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(),
+          new TypeToken<Map<String, Object>>() {}.getType());
+      clonedNoteId = (String) resp.get("body");
+      post.releaseConnection();
+
+      GetMethod get = httpGet("/notebook/" + clonedNoteId);
+      assertThat(get, isAllowed());
+      Map<String, Object> resp2 = gson.fromJson(get.getResponseBodyAsString(),
+          new TypeToken<Map<String, Object>>() {}.getType());
+      Map<String, Object> resp2Body = (Map<String, Object>) resp2.get("body");
+
+      get.releaseConnection();
+
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterNoteEventListener();
+    } catch (InterruptedException e) {
+      LOGGER.error(e.getMessage(), e);
+    } finally {
+      // cleanup
+      if (null != note1) {
+        TestUtils.getInstance(Notebook.class).removeNote(note1.getId(), anonymous);
+      }
+      if (null != clonedNoteId) {
+        TestUtils.getInstance(Notebook.class).removeNote(clonedNoteId, anonymous);
+      }
+    }
+  }
+
+  @Test
+  public void insertParagraphEvent() throws IOException {
+    Note note = null;
+    try {
+      // Create note and set result explicitly
+      note = TestUtils.getInstance(Notebook.class).createNote("note1", anonymous);
+      Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS,
+          InterpreterResult.Type.TEXT, "result");
+      p1.setResult(result);
+
+      // insert new paragraph
+      NewParagraphRequest newParagraphRequest = new NewParagraphRequest();
+
+      PostMethod post = httpPost("/notebook/" + note.getId() + "/paragraph", newParagraphRequest.toJson());
+      LOG.info("test clear paragraph output response\n" + post.getResponseBodyAsString());
+      assertThat(post, isAllowed());
+      post.releaseConnection();
+
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterNoteEventListener();
+    } catch (InterruptedException e) {
+      LOGGER.error(e.getMessage(), e);
+    } finally {
+      // cleanup
+      if (null != note) {
+        TestUtils.getInstance(Notebook.class).removeNote(note.getId(), anonymous);
+      }
+    }
+  }
+
+  @Test
+  public void testClusterAuthEvent() throws IOException {
+    Note note = null;
+
+    try {
+      note = notebook.createNote("note1", anonymous);
+      Paragraph p1 = note.addNewParagraph(anonymous);
+      p1.setText("%md start remote interpreter process");
+      p1.setAuthenticationInfo(anonymous);
+      notebookServer.getNotebook().saveNote(note, anonymous);
+
+      String noteId = note.getId();
+      String user1Id = "user1", user2Id = "user2";
+
+      // test user1 can get anonymous's note
+      List<ParagraphInfo> paragraphList0 = null;
+      try {
+        paragraphList0 = notebookServer.getParagraphList(user1Id, noteId);
+      } catch (ServiceException e) {
+        LOGGER.error(e.getMessage(), e);
+      } catch (TException e) {
+        LOGGER.error(e.getMessage(), e);
+      }
+      assertNotNull(user1Id + " can get anonymous's note", paragraphList0);
+
+      // test user1 cannot get user2's note
+      authorizationService.setOwners(noteId, new HashSet<>(Arrays.asList(user2Id)));
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterAuthEventListener();
+
+      authorizationService.setReaders(noteId, new HashSet<>(Arrays.asList(user2Id)));
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterAuthEventListener();
+
+      authorizationService.setRunners(noteId, new HashSet<>(Arrays.asList(user2Id)));
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterAuthEventListener();
+
+      authorizationService.setWriters(noteId, new HashSet<>(Arrays.asList(user2Id)));
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterAuthEventListener();
+
+      Set<String> roles = Sets.newHashSet("admin");
+      // set admin roles for both user1 and user2
+      authorizationService.setRoles(user2Id, roles);
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterAuthEventListener();
+
+      authorizationService.clearPermission(noteId);
+      // wait cluster sync event
+      Thread.sleep(1000);
+      checkClusterAuthEventListener();
+    } catch (InterruptedException e) {
+      LOGGER.error(e.getMessage(), e);
+    } finally {
+      if (null != note) {
+        notebook.removeNote(note.getId(), anonymous);
+      }
+    }
+  }
+}
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
new file mode 100644
index 0000000..f2ac6b2
--- /dev/null
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteAuthEventListenerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.cluster.event.ClusterMessage;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class ClusterNoteAuthEventListenerTest implements ClusterEventListener {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterNoteAuthEventListenerTest.class);
+
+  public String receiveMsg = null;
+
+  @Override
+  public void onClusterEvent(String msg) {
+    receiveMsg = msg;
+    LOGGER.info("onClusterEvent : {}", msg);
+    ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+    String noteId  = message.get("noteId");
+    String json  = message.get("subject");
+    AuthenticationInfo subject = AuthenticationInfo.fromJson(json);
+
+    assertNotNull(noteId);
+    assertNotNull(json);
+    assertNotNull(subject);
+  }
+}
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
new file mode 100644
index 0000000..a8d9444
--- /dev/null
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.cluster.event.ClusterMessage;
+import org.apache.zeppelin.notebook.Note;
+import org.apache.zeppelin.notebook.Paragraph;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class ClusterNoteEventListenerTest implements ClusterEventListener {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterNoteEventListenerTest.class);
+
+  public String receiveMsg = null;
+
+  @Override
+  public void onClusterEvent(String msg) {
+    receiveMsg = msg;
+    LOGGER.info("onClusterEvent : {}", msg);
+    ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+
+    Note note = null;
+    Paragraph paragraph = null;
+    Set<String> userAndRoles = null;
+    Map<String, Paragraph> userParagraphMap = null;
+    AuthenticationInfo authenticationInfo = null;
+    for (Map.Entry<String, String> entry : message.getData().entrySet()) {
+      String key = entry.getKey();
+      String json = entry.getValue();
+      if (key.equals("AuthenticationInfo")) {
+        authenticationInfo = AuthenticationInfo.fromJson(json);
+        LOGGER.info(authenticationInfo.toJson());
+      } else if (key.equals("Note")) {
+        note = Note.fromJson(json);
+        LOGGER.info(note.toJson());
+      } else if (key.equals("Paragraph")) {
+        paragraph = Paragraph.fromJson(json);
+        LOGGER.info(paragraph.toJson());
+      } else if (key.equals("Set<String>")) {
+        Gson gson = new Gson();
+        userAndRoles = gson.fromJson(json, new TypeToken<Set<String>>() {
+        }.getType());
+        LOGGER.info(userAndRoles.toString());
+      } else if (key.equals("Map<String, Paragraph>")) {
+        Gson gson = new Gson();
+        userParagraphMap = gson.fromJson(json, new TypeToken<Map<String, Paragraph>>() {
+        }.getType());
+        LOGGER.info(userParagraphMap.toString());
+      } else {
+        receiveMsg = null;
+        fail("Unknown clusterEvent : " + message.clusterEvent);
+      }
+    }
+  }
+}
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java
new file mode 100644
index 0000000..a78a582
--- /dev/null
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import org.apache.commons.httpclient.Header;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethodBase;
+import org.apache.commons.httpclient.cookie.CookiePolicy;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.httpclient.methods.RequestEntity;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.notebook.Notebook;
+import org.apache.zeppelin.plugin.PluginManager;
+import org.apache.zeppelin.server.ZeppelinServer;
+import org.apache.zeppelin.utils.TestUtils;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.regex.Pattern;
+
+public class ZeppelinServerMock {
+  protected static final Logger LOG = LoggerFactory.getLogger(ZeppelinServerMock.class);
+
+  static final String REST_API_URL = "/api";
+  static final String URL = getUrlToTest();
+  protected static final boolean WAS_RUNNING = checkIfServerIsRunning();
+
+  protected static File zeppelinHome;
+  protected static File confDir;
+  protected static File notebookDir;
+
+  private String getUrl(String path) {
+    String url;
+    if (System.getProperty("url") != null) {
+      url = System.getProperty("url");
+    } else {
+      url = "http://localhost:8080";
+    }
+    url += REST_API_URL;
+    if (path != null) {
+      url += path;
+    }
+
+    return url;
+  }
+
+  protected static String getUrlToTest() {
+    String url = "http://localhost:8080" + REST_API_URL;
+    if (System.getProperty("url") != null) {
+      url = System.getProperty("url");
+    }
+    return url;
+  }
+
+  static ExecutorService executor;
+  protected static final Runnable SERVER = new Runnable() {
+    @Override
+    public void run() {
+      try {
+        TestUtils.clearInstances();
+        ZeppelinServer.main(new String[]{""});
+      } catch (Exception e) {
+        LOG.error("Exception in WebDriverManager while getWebDriver ", e);
+        throw new RuntimeException(e);
+      }
+    }
+  };
+
+  private static void start(String testClassName, boolean cleanData, ZeppelinConfiguration zconf)
+      throws Exception {
+    LOG.info("Starting ZeppelinServer testClassName: {}", testClassName);
+
+    if (!WAS_RUNNING) {
+      // copy the resources files to a temp folder
+      zeppelinHome = new File("..");
+      LOG.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
+      confDir = new File(zeppelinHome, "conf_" + testClassName);
+      confDir.mkdirs();
+      zconf.save(confDir + "/zeppelin-site.xml");
+
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
+          zeppelinHome.getAbsolutePath());
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(),
+          new File("../zeppelin-web/dist").getAbsolutePath());
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(),
+          confDir.getAbsolutePath());
+      System.setProperty(
+          ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT.getVarName(),
+          "spark");
+      notebookDir = new File(zeppelinHome.getAbsolutePath() + "/notebook_" + testClassName);
+      if (cleanData) {
+        FileUtils.deleteDirectory(notebookDir);
+      }
+      System.setProperty(
+          ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(),
+          notebookDir.getPath()
+      );
+
+      // some test profile does not build zeppelin-web.
+      // to prevent zeppelin starting up fail, create zeppelin-web/dist directory
+      new File("../zeppelin-web/dist").mkdirs();
+
+      LOG.info("Staring test Zeppelin up...");
+      ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+
+      executor = Executors.newSingleThreadExecutor();
+      executor.submit(SERVER);
+      long s = System.currentTimeMillis();
+      boolean started = false;
+      while (System.currentTimeMillis() - s < 1000 * 60 * 3) {  // 3 minutes
+        Thread.sleep(2000);
+        started = checkIfServerIsRunning();
+        if (started == true) {
+          break;
+        }
+      }
+      if (started == false) {
+        throw new RuntimeException("Can not start Zeppelin server");
+      }
+      //ZeppelinServer.notebook.setParagraphJobListener(NotebookServer.getInstance());
+      LOG.info("Test Zeppelin stared.");
+    }
+  }
+
+  protected static void startUp(String testClassName, ZeppelinConfiguration zconf) throws Exception {
+    start(testClassName, true, zconf);
+  }
+
+  protected static void shutDown() throws Exception {
+    shutDown(true);
+  }
+
+  protected static void shutDown(final boolean deleteConfDir) throws Exception {
+    if (!WAS_RUNNING && TestUtils.getInstance(Notebook.class) != null) {
+      // restart interpreter to stop all interpreter processes
+      List<InterpreterSetting> settingList = TestUtils.getInstance(Notebook.class).getInterpreterSettingManager()
+          .get();
+      if (!TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled()) {
+        for (InterpreterSetting setting : settingList) {
+          TestUtils.getInstance(Notebook.class).getInterpreterSettingManager().restart(setting.getId());
+        }
+      }
+      LOG.info("Terminating test Zeppelin...");
+      ZeppelinServer.jettyWebServer.stop();
+      executor.shutdown();
+      PluginManager.reset();
+
+      long s = System.currentTimeMillis();
+      boolean started = true;
+      while (System.currentTimeMillis() - s < 1000 * 60 * 3) {  // 3 minutes
+        Thread.sleep(2000);
+        started = checkIfServerIsRunning();
+        if (started == false) {
+          break;
+        }
+      }
+      if (started == true) {
+        throw new RuntimeException("Can not stop Zeppelin server");
+      }
+
+      LOG.info("Test Zeppelin terminated.");
+
+      if (deleteConfDir && !TestUtils.getInstance(Notebook.class).getConf().isRecoveryEnabled()) {
+        // don't delete interpreter.json when recovery is enabled. otherwise the interpreter setting
+        // id will change after zeppelin restart, then we can not recover interpreter process
+        // properly
+        FileUtils.deleteDirectory(confDir);
+      }
+    }
+
+  }
+
+  protected static boolean checkIfServerIsRunning() {
+    GetMethod request = null;
+    boolean isRunning;
+    try {
+      request = httpGet("/version");
+      isRunning = request.getStatusCode() == 200;
+    } catch (IOException e) {
+      LOG.error("AbstractTestRestApi.checkIfServerIsRunning() fails .. ZeppelinServer is not " +
+          "running");
+      isRunning = false;
+    } finally {
+      if (request != null) {
+        request.releaseConnection();
+      }
+    }
+    return isRunning;
+  }
+
+  protected static GetMethod httpGet(String path) throws IOException {
+    return httpGet(path, StringUtils.EMPTY, StringUtils.EMPTY);
+  }
+
+  protected static GetMethod httpGet(String path, String user, String pwd) throws IOException {
+    return httpGet(path, user, pwd, StringUtils.EMPTY);
+  }
+
+  protected static GetMethod httpGet(String path, String user, String pwd, String cookies)
+      throws IOException {
+    LOG.info("Connecting to {}", URL + path);
+    HttpClient httpClient = new HttpClient();
+    GetMethod getMethod = new GetMethod(URL + path);
+    getMethod.addRequestHeader("Origin", URL);
+    if (userAndPasswordAreNotBlank(user, pwd)) {
+      getMethod.setRequestHeader("Cookie", "JSESSIONID=" + getCookie(user, pwd));
+    }
+    if (!StringUtils.isBlank(cookies)) {
+      getMethod.setRequestHeader("Cookie", getMethod.getResponseHeader("Cookie") + ";" + cookies);
+    }
+    httpClient.executeMethod(getMethod);
+    LOG.info("{} - {}", getMethod.getStatusCode(), getMethod.getStatusText());
+    return getMethod;
+  }
+
+  protected static PutMethod httpPut(String path, String body) throws IOException {
+    return httpPut(path, body, StringUtils.EMPTY, StringUtils.EMPTY);
+  }
+
+  protected static PutMethod httpPut(String path, String body, String user, String pwd)
+      throws IOException {
+    LOG.info("Connecting to {}", URL + path);
+    HttpClient httpClient = new HttpClient();
+    PutMethod putMethod = new PutMethod(URL + path);
+    putMethod.addRequestHeader("Origin", URL);
+    RequestEntity entity = new ByteArrayRequestEntity(body.getBytes("UTF-8"));
+    putMethod.setRequestEntity(entity);
+    if (userAndPasswordAreNotBlank(user, pwd)) {
+      putMethod.setRequestHeader("Cookie", "JSESSIONID=" + getCookie(user, pwd));
+    }
+    httpClient.executeMethod(putMethod);
+    LOG.info("{} - {}", putMethod.getStatusCode(), putMethod.getStatusText());
+    return putMethod;
+  }
+
+  protected static PostMethod httpPost(String path, String body) throws IOException {
+    return httpPost(path, body, StringUtils.EMPTY, StringUtils.EMPTY);
+  }
+
+  protected static PostMethod httpPost(String path, String request, String user, String pwd)
+      throws IOException {
+    LOG.info("Connecting to {}", URL + path);
+    HttpClient httpClient = new HttpClient();
+    PostMethod postMethod = new PostMethod(URL + path);
+    postMethod.setRequestBody(request);
+    postMethod.getParams().setCookiePolicy(CookiePolicy.IGNORE_COOKIES);
+    if (userAndPasswordAreNotBlank(user, pwd)) {
+      postMethod.setRequestHeader("Cookie", "JSESSIONID=" + getCookie(user, pwd));
+    }
+    httpClient.executeMethod(postMethod);
+    LOG.info("{} - {}", postMethod.getStatusCode(), postMethod.getStatusText());
+    return postMethod;
+  }
+
+  private static String getCookie(String user, String password) throws IOException {
+    HttpClient httpClient = new HttpClient();
+    PostMethod postMethod = new PostMethod(URL + "/login");
+    postMethod.addRequestHeader("Origin", URL);
+    postMethod.setParameter("password", password);
+    postMethod.setParameter("userName", user);
+    httpClient.executeMethod(postMethod);
+    LOG.info("{} - {}", postMethod.getStatusCode(), postMethod.getStatusText());
+    Pattern pattern = Pattern.compile("JSESSIONID=([a-zA-Z0-9-]*)");
+    Header[] setCookieHeaders = postMethod.getResponseHeaders("Set-Cookie");
+    String jsessionId = null;
+    for (Header setCookie : setCookieHeaders) {
+      java.util.regex.Matcher matcher = pattern.matcher(setCookie.toString());
+      if (matcher.find()) {
+        jsessionId = matcher.group(1);
+      }
+    }
+
+    if (jsessionId != null) {
+      return jsessionId;
+    } else {
+      return StringUtils.EMPTY;
+    }
+  }
+
+  protected static boolean userAndPasswordAreNotBlank(String user, String pwd) {
+    if (StringUtils.isBlank(user) && StringUtils.isBlank(pwd)) {
+      return false;
+    }
+    return true;
+  }
+
+  protected Matcher<HttpMethodBase> responsesWith(final int expectedStatusCode) {
+    return new TypeSafeMatcher<HttpMethodBase>() {
+      WeakReference<HttpMethodBase> method;
+
+      @Override
+      public boolean matchesSafely(HttpMethodBase httpMethodBase) {
+        method = (method == null) ? new WeakReference<>(httpMethodBase) : method;
+        return httpMethodBase.getStatusCode() == expectedStatusCode;
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("HTTP response ").appendValue(expectedStatusCode)
+            .appendText(" from ").appendText(method.get().getPath());
+      }
+
+      @Override
+      protected void describeMismatchSafely(HttpMethodBase item, Description description) {
+        description.appendText("got ").appendValue(item.getStatusCode()).appendText(" ")
+            .appendText(item.getStatusText());
+      }
+    };
+  }
+
+  /**
+   * Status code matcher.
+   */
+  protected Matcher<? super HttpMethodBase> isForbidden() {
+    return responsesWith(403);
+  }
+
+  protected Matcher<? super HttpMethodBase> isAllowed() {
+    return responsesWith(200);
+  }
+
+  protected Matcher<? super HttpMethodBase> isCreated() {
+    return responsesWith(201);
+  }
+
+  protected Matcher<? super HttpMethodBase> isBadRequest() {
+    return responsesWith(400);
+  }
+
+  protected Matcher<? super HttpMethodBase> isNotFound() {
+    return responsesWith(404);
+  }
+
+  protected Matcher<? super HttpMethodBase> isNotAllowed() {
+    return responsesWith(405);
+  }
+}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java
index 7a2cde7..5f0ea36 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/AuthorizationService.java
@@ -20,7 +20,13 @@ package org.apache.zeppelin.notebook;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.cluster.event.ClusterEvent;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.cluster.event.ClusterMessage;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
@@ -37,7 +43,7 @@ import java.util.Set;
  * This class is responsible for maintain notes authorization info. And provide api for
  * setting and querying note authorization info.
  */
-public class AuthorizationService {
+public class AuthorizationService implements ClusterEventListener {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(AuthorizationService.class);
   private static final Set<String> EMPTY_SET = new HashSet<>();
@@ -66,21 +72,41 @@ public class AuthorizationService {
   }
 
   public void setOwners(String noteId, Set<String> entities) {
+    inlineSetOwners(noteId, entities);
+    broadcastClusterEvent(ClusterEvent.SET_OWNERS_PERMISSIONS, noteId, null, entities);
+  }
+
+  private void inlineSetOwners(String noteId, Set<String> entities) {
     entities = validateUser(entities);
     notebook.getNote(noteId).setOwners(entities);
   }
 
   public void setReaders(String noteId, Set<String> entities) {
+    inlineSetReaders(noteId, entities);
+    broadcastClusterEvent(ClusterEvent.SET_READERS_PERMISSIONS, noteId, null, entities);
+  }
+
+  private void inlineSetReaders(String noteId, Set<String> entities) {
     entities = validateUser(entities);
     notebook.getNote(noteId).setReaders(entities);
   }
 
   public void setRunners(String noteId, Set<String> entities) {
+    inlineSetRunners(noteId, entities);
+    broadcastClusterEvent(ClusterEvent.SET_RUNNERS_PERMISSIONS, noteId, null, entities);
+  }
+
+  private void inlineSetRunners(String noteId, Set<String> entities) {
     entities = validateUser(entities);
     notebook.getNote(noteId).setRunners(entities);
   }
 
   public void setWriters(String noteId, Set<String> entities) {
+    inlineSetWriters(noteId, entities);
+    broadcastClusterEvent(ClusterEvent.SET_WRITERS_PERMISSIONS, noteId, null, entities);
+  }
+
+  private void inlineSetWriters(String noteId, Set<String> entities) {
     entities = validateUser(entities);
     notebook.getNote(noteId).setWriters(entities);
   }
@@ -211,6 +237,11 @@ public class AuthorizationService {
   }
 
   public void setRoles(String user, Set<String> roles) {
+    inlineSetRoles(user, roles);
+    broadcastClusterEvent(ClusterEvent.SET_ROLES, null, user, roles);
+  }
+
+  private void inlineSetRoles(String user, Set<String> roles) {
     if (StringUtils.isBlank(user)) {
       LOGGER.warn("Setting roles for empty user");
       return;
@@ -241,9 +272,74 @@ public class AuthorizationService {
   }
 
   public void clearPermission(String noteId) {
+    inlineClearPermission(noteId);
+    broadcastClusterEvent(ClusterEvent.CLEAR_PERMISSION, noteId, null, null);
+  }
+
+  public void inlineClearPermission(String noteId) {
     notebook.getNote(noteId).setReaders(Sets.newHashSet());
     notebook.getNote(noteId).setRunners(Sets.newHashSet());
     notebook.getNote(noteId).setWriters(Sets.newHashSet());
     notebook.getNote(noteId).setOwners(Sets.newHashSet());
   }
+
+  @Override
+  public void onClusterEvent(String msg) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("onClusterEvent : {}", msg);
+    }
+
+    ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+
+    String noteId = message.get("noteId");
+    String user = message.get("user");
+    String jsonSet = message.get("set");
+    Gson gson = new Gson();
+    Set<String> set  = gson.fromJson(jsonSet, new TypeToken<Set<String>>() {
+    }.getType());
+
+    switch (message.clusterEvent) {
+      case SET_READERS_PERMISSIONS:
+        inlineSetReaders(noteId, set);
+        break;
+      case SET_WRITERS_PERMISSIONS:
+        inlineSetWriters(noteId, set);
+        break;
+      case SET_OWNERS_PERMISSIONS:
+        inlineSetOwners(noteId, set);
+        break;
+      case SET_RUNNERS_PERMISSIONS:
+        inlineSetRunners(noteId, set);
+        break;
+      case SET_ROLES:
+        inlineSetRoles(user, set);
+        break;
+      case CLEAR_PERMISSION:
+        inlineClearPermission(noteId);
+        break;
+      default:
+        LOGGER.error("Unknown clusterEvent:{}, msg:{} ", message.clusterEvent, msg);
+        break;
+    }
+  }
+
+  // broadcast cluster event
+  private void broadcastClusterEvent(ClusterEvent event, String noteId,
+                                     String user, Set<String> set) {
+    ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+    if (!conf.isClusterMode()) {
+      return;
+    }
+    ClusterMessage message = new ClusterMessage(event);
+    message.put("noteId", noteId);
+    message.put("user", user);
+
+    Gson gson = new Gson();
+    String json = gson.toJson(set, new TypeToken<Set<String>>() {
+    }.getType());
+    message.put("set", json);
+    String msg = ClusterMessage.serializeMessage(message);
+    ClusterManagerServer.getInstance().broadcastClusterEvent(
+        ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, msg);
+  }
 }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java
index 0443891..8429e0c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.notebook;
 
 import java.io.IOException;
+import java.lang.reflect.GenericSignatureFormatError;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -26,7 +27,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.gson.Gson;
 import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.cluster.event.ClusterEvent;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.cluster.event.ClusterMessage;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.scheduler.Job;
@@ -42,7 +48,7 @@ import com.google.common.collect.Sets;
 /**
  * Contains authorization information for notes
  */
-public class NotebookAuthorization implements NoteEventListener {
+public class NotebookAuthorization implements NoteEventListener, ClusterEventListener {
   private static final Logger LOG = LoggerFactory.getLogger(NotebookAuthorization.class);
   private static NotebookAuthorization instance = null;
   /*
@@ -64,6 +70,8 @@ public class NotebookAuthorization implements NoteEventListener {
     if (instance == null) {
       instance = new NotebookAuthorization();
       conf = config;
+      ClusterManagerServer.getInstance().addClusterEventListeners(
+          ClusterManagerServer.CLUSTER_NB_AUTH_EVENT_TOPIC, instance);
       try {
         configStorage = ConfigStorage.getInstance(config);
         loadFromFile();
@@ -378,8 +386,13 @@ public class NotebookAuthorization implements NoteEventListener {
       }
     }).toList();
   }
-  
+
   public void setNewNotePermissions(String noteId, AuthenticationInfo subject) {
+    inlineSetNewNotePermissions(noteId, subject);
+    broadcastClusterEvent(ClusterEvent.SET_NEW_NOTE_PERMISSIONS, noteId, subject);
+  }
+  
+  public void inlineSetNewNotePermissions(String noteId, AuthenticationInfo subject) {
     if (!AuthenticationInfo.isAnonymous(subject)) {
       if (isPublic()) {
         // add current user to owners - can be public
@@ -387,16 +400,20 @@ public class NotebookAuthorization implements NoteEventListener {
         owners.add(subject.getUser());
         setOwners(noteId, owners);
       } else {
+        Map<ClusterEvent, Set<String>> mapEntities = new HashMap<>();
         // add current user to owners, readers, runners, writers - private note
         Set<String> entities = getOwners(noteId);
         entities.add(subject.getUser());
         setOwners(noteId, entities);
+
         entities = getReaders(noteId);
         entities.add(subject.getUser());
         setReaders(noteId, entities);
+
         entities = getRunners(noteId);
         entities.add(subject.getUser());
         setRunners(noteId, entities);
+
         entities = getWriters(noteId);
         entities.add(subject.getUser());
         setWriters(noteId, entities);
@@ -438,4 +455,31 @@ public class NotebookAuthorization implements NoteEventListener {
   public void onParagraphStatusChange(Paragraph p, Job.Status status) {
 
   }
+
+  @Override
+  public void onClusterEvent(String msg) {
+    if (LOG.isDebugEnabled()) {
+      LOG.info("onClusterEvent : {}", msg);
+    }
+    ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+    String noteId  = message.get("noteId");
+    String json  = message.get("subject");
+    AuthenticationInfo subject = AuthenticationInfo.fromJson(json);
+
+    inlineSetNewNotePermissions(noteId, subject);
+  }
+
+  // broadcast cluster event
+  private void broadcastClusterEvent(ClusterEvent event, String noteId, AuthenticationInfo subject) {
+    if (!conf.isClusterMode()) {
+      return;
+    }
+
+    ClusterMessage message = new ClusterMessage(event);
+    message.put("noteId", noteId);
+    message.put("subject", subject.toJson());
+    String msg = ClusterMessage.serializeMessage(message);
+    ClusterManagerServer.getInstance().broadcastClusterEvent(
+        ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC, msg);
+  }
 }