You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/15 01:09:58 UTC

[03/13] zeppelin git commit: ZEPPELIN-2035. BI directional RPC framework between ZeppelinServer and InterpreterProcess on top of thrift

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
new file mode 100644
index 0000000..308364f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
@@ -0,0 +1,828 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
+public class RunParagraphsEvent implements org.apache.thrift.TBase<RunParagraphsEvent, RunParagraphsEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RunParagraphsEvent> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RunParagraphsEvent");
+
+  private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField PARAGRAPH_IDS_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphIds", org.apache.thrift.protocol.TType.LIST, (short)2);
+  private static final org.apache.thrift.protocol.TField PARAGRAPH_INDICES_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphIndices", org.apache.thrift.protocol.TType.LIST, (short)3);
+  private static final org.apache.thrift.protocol.TField CUR_PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("curParagraphId", org.apache.thrift.protocol.TType.STRING, (short)4);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new RunParagraphsEventStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new RunParagraphsEventTupleSchemeFactory());
+  }
+
+  public String noteId; // required
+  public List<String> paragraphIds; // required
+  public List<Integer> paragraphIndices; // required
+  public String curParagraphId; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    NOTE_ID((short)1, "noteId"),
+    PARAGRAPH_IDS((short)2, "paragraphIds"),
+    PARAGRAPH_INDICES((short)3, "paragraphIndices"),
+    CUR_PARAGRAPH_ID((short)4, "curParagraphId");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // NOTE_ID
+          return NOTE_ID;
+        case 2: // PARAGRAPH_IDS
+          return PARAGRAPH_IDS;
+        case 3: // PARAGRAPH_INDICES
+          return PARAGRAPH_INDICES;
+        case 4: // CUR_PARAGRAPH_ID
+          return CUR_PARAGRAPH_ID;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.PARAGRAPH_IDS, new org.apache.thrift.meta_data.FieldMetaData("paragraphIds", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+    tmpMap.put(_Fields.PARAGRAPH_INDICES, new org.apache.thrift.meta_data.FieldMetaData("paragraphIndices", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))));
+    tmpMap.put(_Fields.CUR_PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("curParagraphId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RunParagraphsEvent.class, metaDataMap);
+  }
+
+  public RunParagraphsEvent() {
+  }
+
+  public RunParagraphsEvent(
+    String noteId,
+    List<String> paragraphIds,
+    List<Integer> paragraphIndices,
+    String curParagraphId)
+  {
+    this();
+    this.noteId = noteId;
+    this.paragraphIds = paragraphIds;
+    this.paragraphIndices = paragraphIndices;
+    this.curParagraphId = curParagraphId;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public RunParagraphsEvent(RunParagraphsEvent other) {
+    if (other.isSetNoteId()) {
+      this.noteId = other.noteId;
+    }
+    if (other.isSetParagraphIds()) {
+      List<String> __this__paragraphIds = new ArrayList<String>(other.paragraphIds);
+      this.paragraphIds = __this__paragraphIds;
+    }
+    if (other.isSetParagraphIndices()) {
+      List<Integer> __this__paragraphIndices = new ArrayList<Integer>(other.paragraphIndices);
+      this.paragraphIndices = __this__paragraphIndices;
+    }
+    if (other.isSetCurParagraphId()) {
+      this.curParagraphId = other.curParagraphId;
+    }
+  }
+
+  public RunParagraphsEvent deepCopy() {
+    return new RunParagraphsEvent(this);
+  }
+
+  @Override
+  public void clear() {
+    this.noteId = null;
+    this.paragraphIds = null;
+    this.paragraphIndices = null;
+    this.curParagraphId = null;
+  }
+
+  public String getNoteId() {
+    return this.noteId;
+  }
+
+  public RunParagraphsEvent setNoteId(String noteId) {
+    this.noteId = noteId;
+    return this;
+  }
+
+  public void unsetNoteId() {
+    this.noteId = null;
+  }
+
+  /** Returns true if field noteId is set (has been assigned a value) and false otherwise */
+  public boolean isSetNoteId() {
+    return this.noteId != null;
+  }
+
+  public void setNoteIdIsSet(boolean value) {
+    if (!value) {
+      this.noteId = null;
+    }
+  }
+
+  public int getParagraphIdsSize() {
+    return (this.paragraphIds == null) ? 0 : this.paragraphIds.size();
+  }
+
+  public java.util.Iterator<String> getParagraphIdsIterator() {
+    return (this.paragraphIds == null) ? null : this.paragraphIds.iterator();
+  }
+
+  public void addToParagraphIds(String elem) {
+    if (this.paragraphIds == null) {
+      this.paragraphIds = new ArrayList<String>();
+    }
+    this.paragraphIds.add(elem);
+  }
+
+  public List<String> getParagraphIds() {
+    return this.paragraphIds;
+  }
+
+  public RunParagraphsEvent setParagraphIds(List<String> paragraphIds) {
+    this.paragraphIds = paragraphIds;
+    return this;
+  }
+
+  public void unsetParagraphIds() {
+    this.paragraphIds = null;
+  }
+
+  /** Returns true if field paragraphIds is set (has been assigned a value) and false otherwise */
+  public boolean isSetParagraphIds() {
+    return this.paragraphIds != null;
+  }
+
+  public void setParagraphIdsIsSet(boolean value) {
+    if (!value) {
+      this.paragraphIds = null;
+    }
+  }
+
+  public int getParagraphIndicesSize() {
+    return (this.paragraphIndices == null) ? 0 : this.paragraphIndices.size();
+  }
+
+  public java.util.Iterator<Integer> getParagraphIndicesIterator() {
+    return (this.paragraphIndices == null) ? null : this.paragraphIndices.iterator();
+  }
+
+  public void addToParagraphIndices(int elem) {
+    if (this.paragraphIndices == null) {
+      this.paragraphIndices = new ArrayList<Integer>();
+    }
+    this.paragraphIndices.add(elem);
+  }
+
+  public List<Integer> getParagraphIndices() {
+    return this.paragraphIndices;
+  }
+
+  public RunParagraphsEvent setParagraphIndices(List<Integer> paragraphIndices) {
+    this.paragraphIndices = paragraphIndices;
+    return this;
+  }
+
+  public void unsetParagraphIndices() {
+    this.paragraphIndices = null;
+  }
+
+  /** Returns true if field paragraphIndices is set (has been assigned a value) and false otherwise */
+  public boolean isSetParagraphIndices() {
+    return this.paragraphIndices != null;
+  }
+
+  public void setParagraphIndicesIsSet(boolean value) {
+    if (!value) {
+      this.paragraphIndices = null;
+    }
+  }
+
+  public String getCurParagraphId() {
+    return this.curParagraphId;
+  }
+
+  public RunParagraphsEvent setCurParagraphId(String curParagraphId) {
+    this.curParagraphId = curParagraphId;
+    return this;
+  }
+
+  public void unsetCurParagraphId() {
+    this.curParagraphId = null;
+  }
+
+  /** Returns true if field curParagraphId is set (has been assigned a value) and false otherwise */
+  public boolean isSetCurParagraphId() {
+    return this.curParagraphId != null;
+  }
+
+  public void setCurParagraphIdIsSet(boolean value) {
+    if (!value) {
+      this.curParagraphId = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case NOTE_ID:
+      if (value == null) {
+        unsetNoteId();
+      } else {
+        setNoteId((String)value);
+      }
+      break;
+
+    case PARAGRAPH_IDS:
+      if (value == null) {
+        unsetParagraphIds();
+      } else {
+        setParagraphIds((List<String>)value);
+      }
+      break;
+
+    case PARAGRAPH_INDICES:
+      if (value == null) {
+        unsetParagraphIndices();
+      } else {
+        setParagraphIndices((List<Integer>)value);
+      }
+      break;
+
+    case CUR_PARAGRAPH_ID:
+      if (value == null) {
+        unsetCurParagraphId();
+      } else {
+        setCurParagraphId((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case NOTE_ID:
+      return getNoteId();
+
+    case PARAGRAPH_IDS:
+      return getParagraphIds();
+
+    case PARAGRAPH_INDICES:
+      return getParagraphIndices();
+
+    case CUR_PARAGRAPH_ID:
+      return getCurParagraphId();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case NOTE_ID:
+      return isSetNoteId();
+    case PARAGRAPH_IDS:
+      return isSetParagraphIds();
+    case PARAGRAPH_INDICES:
+      return isSetParagraphIndices();
+    case CUR_PARAGRAPH_ID:
+      return isSetCurParagraphId();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof RunParagraphsEvent)
+      return this.equals((RunParagraphsEvent)that);
+    return false;
+  }
+
+  public boolean equals(RunParagraphsEvent that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_noteId = true && this.isSetNoteId();
+    boolean that_present_noteId = true && that.isSetNoteId();
+    if (this_present_noteId || that_present_noteId) {
+      if (!(this_present_noteId && that_present_noteId))
+        return false;
+      if (!this.noteId.equals(that.noteId))
+        return false;
+    }
+
+    boolean this_present_paragraphIds = true && this.isSetParagraphIds();
+    boolean that_present_paragraphIds = true && that.isSetParagraphIds();
+    if (this_present_paragraphIds || that_present_paragraphIds) {
+      if (!(this_present_paragraphIds && that_present_paragraphIds))
+        return false;
+      if (!this.paragraphIds.equals(that.paragraphIds))
+        return false;
+    }
+
+    boolean this_present_paragraphIndices = true && this.isSetParagraphIndices();
+    boolean that_present_paragraphIndices = true && that.isSetParagraphIndices();
+    if (this_present_paragraphIndices || that_present_paragraphIndices) {
+      if (!(this_present_paragraphIndices && that_present_paragraphIndices))
+        return false;
+      if (!this.paragraphIndices.equals(that.paragraphIndices))
+        return false;
+    }
+
+    boolean this_present_curParagraphId = true && this.isSetCurParagraphId();
+    boolean that_present_curParagraphId = true && that.isSetCurParagraphId();
+    if (this_present_curParagraphId || that_present_curParagraphId) {
+      if (!(this_present_curParagraphId && that_present_curParagraphId))
+        return false;
+      if (!this.curParagraphId.equals(that.curParagraphId))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_noteId = true && (isSetNoteId());
+    list.add(present_noteId);
+    if (present_noteId)
+      list.add(noteId);
+
+    boolean present_paragraphIds = true && (isSetParagraphIds());
+    list.add(present_paragraphIds);
+    if (present_paragraphIds)
+      list.add(paragraphIds);
+
+    boolean present_paragraphIndices = true && (isSetParagraphIndices());
+    list.add(present_paragraphIndices);
+    if (present_paragraphIndices)
+      list.add(paragraphIndices);
+
+    boolean present_curParagraphId = true && (isSetCurParagraphId());
+    list.add(present_curParagraphId);
+    if (present_curParagraphId)
+      list.add(curParagraphId);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(RunParagraphsEvent other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetNoteId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetParagraphIds()).compareTo(other.isSetParagraphIds());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetParagraphIds()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphIds, other.paragraphIds);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetParagraphIndices()).compareTo(other.isSetParagraphIndices());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetParagraphIndices()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphIndices, other.paragraphIndices);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetCurParagraphId()).compareTo(other.isSetCurParagraphId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetCurParagraphId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.curParagraphId, other.curParagraphId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("RunParagraphsEvent(");
+    boolean first = true;
+
+    sb.append("noteId:");
+    if (this.noteId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.noteId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("paragraphIds:");
+    if (this.paragraphIds == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.paragraphIds);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("paragraphIndices:");
+    if (this.paragraphIndices == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.paragraphIndices);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("curParagraphId:");
+    if (this.curParagraphId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.curParagraphId);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class RunParagraphsEventStandardSchemeFactory implements SchemeFactory {
+    public RunParagraphsEventStandardScheme getScheme() {
+      return new RunParagraphsEventStandardScheme();
+    }
+  }
+
+  private static class RunParagraphsEventStandardScheme extends StandardScheme<RunParagraphsEvent> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // NOTE_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.noteId = iprot.readString();
+              struct.setNoteIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // PARAGRAPH_IDS
+            if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+              {
+                org.apache.thrift.protocol.TList _list8 = iprot.readListBegin();
+                struct.paragraphIds = new ArrayList<String>(_list8.size);
+                String _elem9;
+                for (int _i10 = 0; _i10 < _list8.size; ++_i10)
+                {
+                  _elem9 = iprot.readString();
+                  struct.paragraphIds.add(_elem9);
+                }
+                iprot.readListEnd();
+              }
+              struct.setParagraphIdsIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // PARAGRAPH_INDICES
+            if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+              {
+                org.apache.thrift.protocol.TList _list11 = iprot.readListBegin();
+                struct.paragraphIndices = new ArrayList<Integer>(_list11.size);
+                int _elem12;
+                for (int _i13 = 0; _i13 < _list11.size; ++_i13)
+                {
+                  _elem12 = iprot.readI32();
+                  struct.paragraphIndices.add(_elem12);
+                }
+                iprot.readListEnd();
+              }
+              struct.setParagraphIndicesIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 4: // CUR_PARAGRAPH_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.curParagraphId = iprot.readString();
+              struct.setCurParagraphIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.noteId != null) {
+        oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
+        oprot.writeString(struct.noteId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.paragraphIds != null) {
+        oprot.writeFieldBegin(PARAGRAPH_IDS_FIELD_DESC);
+        {
+          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.paragraphIds.size()));
+          for (String _iter14 : struct.paragraphIds)
+          {
+            oprot.writeString(_iter14);
+          }
+          oprot.writeListEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+      if (struct.paragraphIndices != null) {
+        oprot.writeFieldBegin(PARAGRAPH_INDICES_FIELD_DESC);
+        {
+          oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, struct.paragraphIndices.size()));
+          for (int _iter15 : struct.paragraphIndices)
+          {
+            oprot.writeI32(_iter15);
+          }
+          oprot.writeListEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+      if (struct.curParagraphId != null) {
+        oprot.writeFieldBegin(CUR_PARAGRAPH_ID_FIELD_DESC);
+        oprot.writeString(struct.curParagraphId);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class RunParagraphsEventTupleSchemeFactory implements SchemeFactory {
+    public RunParagraphsEventTupleScheme getScheme() {
+      return new RunParagraphsEventTupleScheme();
+    }
+  }
+
+  private static class RunParagraphsEventTupleScheme extends TupleScheme<RunParagraphsEvent> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetNoteId()) {
+        optionals.set(0);
+      }
+      if (struct.isSetParagraphIds()) {
+        optionals.set(1);
+      }
+      if (struct.isSetParagraphIndices()) {
+        optionals.set(2);
+      }
+      if (struct.isSetCurParagraphId()) {
+        optionals.set(3);
+      }
+      oprot.writeBitSet(optionals, 4);
+      if (struct.isSetNoteId()) {
+        oprot.writeString(struct.noteId);
+      }
+      if (struct.isSetParagraphIds()) {
+        {
+          oprot.writeI32(struct.paragraphIds.size());
+          for (String _iter16 : struct.paragraphIds)
+          {
+            oprot.writeString(_iter16);
+          }
+        }
+      }
+      if (struct.isSetParagraphIndices()) {
+        {
+          oprot.writeI32(struct.paragraphIndices.size());
+          for (int _iter17 : struct.paragraphIndices)
+          {
+            oprot.writeI32(_iter17);
+          }
+        }
+      }
+      if (struct.isSetCurParagraphId()) {
+        oprot.writeString(struct.curParagraphId);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(4);
+      if (incoming.get(0)) {
+        struct.noteId = iprot.readString();
+        struct.setNoteIdIsSet(true);
+      }
+      if (incoming.get(1)) {
+        {
+          org.apache.thrift.protocol.TList _list18 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+          struct.paragraphIds = new ArrayList<String>(_list18.size);
+          String _elem19;
+          for (int _i20 = 0; _i20 < _list18.size; ++_i20)
+          {
+            _elem19 = iprot.readString();
+            struct.paragraphIds.add(_elem19);
+          }
+        }
+        struct.setParagraphIdsIsSet(true);
+      }
+      if (incoming.get(2)) {
+        {
+          org.apache.thrift.protocol.TList _list21 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, iprot.readI32());
+          struct.paragraphIndices = new ArrayList<Integer>(_list21.size);
+          int _elem22;
+          for (int _i23 = 0; _i23 < _list21.size; ++_i23)
+          {
+            _elem22 = iprot.readI32();
+            struct.paragraphIndices.add(_elem22);
+          }
+        }
+        struct.setParagraphIndicesIsSet(true);
+      }
+      if (incoming.get(3)) {
+        struct.curParagraphId = iprot.readString();
+        struct.setCurParagraphIdIsSet(true);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
index a537d02..f31c69e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 /**
- * Autogenerated by Thrift Compiler (0.9.3)
+ * Autogenerated by Thrift Compiler (0.9.2)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
  *  @generated
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2018-05-22")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
 public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner");
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
new file mode 100644
index 0000000..6d245c5
--- /dev/null
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+include "RemoteInterpreterService.thrift"
+
+namespace java org.apache.zeppelin.interpreter.thrift
+
+struct RegisterInfo {
+  1: string host,
+  2: i32 port
+  3: string interpreterGroupId
+}
+
+struct OutputAppendEvent {
+  1: string noteId,
+  2: string paragraphId,
+  3: i32 index,
+  4: string data,
+  5: string appId
+}
+
+struct OutputUpdateEvent {
+  1: string noteId,
+  2: string paragraphId,
+  3: i32 index,
+  4: string type,
+  5: string data,
+  6: string appId
+}
+
+struct OutputUpdateAllEvent {
+  1: string noteId,
+  2: string paragraphId,
+  3: list<RemoteInterpreterService.RemoteInterpreterResultMessage> msg,
+}
+
+struct RunParagraphsEvent {
+  1: string noteId,
+  2: list<string> paragraphIds,
+  3: list<i32> paragraphIndices,
+  4: string curParagraphId
+}
+
+struct AngularObjectId {
+  1: string noteId,
+  2: string paragraphId,
+  3: string name
+}
+
+struct AppOutputAppendEvent {
+  1: string noteId,
+  2: string paragraphId,
+  3: string appId,
+  4: i32 index,
+  5: string data
+}
+
+struct AppOutputUpdateEvent {
+  1: string noteId,
+  2: string paragraphId,
+  3: string appId,
+  4: i32 index,
+  5: string type,
+  6: string data
+}
+
+struct AppStatusUpdateEvent {
+  1: string noteId,
+  2: string paragraphId,
+  3: string appId,
+  4: string status
+}
+
+service RemoteInterpreterEventService {
+  void registerInterpreterProcess(1: RegisterInfo registerInfo);
+  void appendOutput(1: OutputAppendEvent event);
+  void updateOutput(1: OutputUpdateEvent event);
+  void updateAllOutput(1: OutputUpdateAllEvent event);
+
+  void appendAppOutput(1: AppOutputAppendEvent event);
+  void updateAppOutput(1: AppOutputUpdateEvent event);
+  void updateAppStatus(1: AppStatusUpdateEvent event);
+
+  void runParagraphs(1: RunParagraphsEvent event);
+
+  void addAngularObject(1: string intpGroupId, 2: string json);
+  void updateAngularObject(1: string intpGroupId, 2: string json);
+  void removeAngularObject(1: string intpGroupId, 2: string noteId, 3: string paragraphId, 4: string name);
+
+  void sendMetaInfo(1: string intpGroupId, 2: string json);
+  void sendParagraphInfo(1: string intpGroupId, 2: string json);
+
+  list<string> getAllResources(1: string intpGroupId);
+  binary getResource(1: string resourceIdJson);
+  binary invokeMethod(1: string intpGroupId, 2: string invokeMethodJson);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 559648a..fcc14b0 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -28,7 +28,6 @@ struct RemoteInterpreterContext {
   7: string config,   // json serialized config
   8: string gui,      // json serialized gui
   9: string noteGui,      // json serialized note gui
-  10: string runners   // json serialized runner
 }
 
 struct RemoteInterpreterResultMessage {
@@ -90,55 +89,40 @@ struct InterpreterCompletion {
   3: string meta
 }
 
-struct CallbackInfo {
-  1: string host,
-  2: i32 port
-}
+
 
 service RemoteInterpreterService {
 
-  void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties, 5: string userName);
-  void open(1: string sessionKey, 2: string className);
-  void close(1: string sessionKey, 2: string className);
-  RemoteInterpreterResult interpret(1: string sessionKey, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
-  void cancel(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
-  i32 getProgress(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
-  string getFormType(1: string sessionKey, 2: string className);
-  list<InterpreterCompletion> completion(1: string sessionKey, 2: string className, 3: string buf, 4: i32 cursor, 5: RemoteInterpreterContext interpreterContext);
+  void createInterpreter(1: string intpGroupId, 2: string sessionId, 3: string className, 4: map<string, string> properties, 5: string userName);
+  void open(1: string sessionId, 2: string className);
+  void close(1: string sessionId, 2: string className);
+  RemoteInterpreterResult interpret(1: string sessionId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
+  void cancel(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
+  i32 getProgress(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
+  string getFormType(1: string sessionId, 2: string className);
+  list<InterpreterCompletion> completion(1: string sessionId, 2: string className, 3: string buf, 4: i32 cursor, 5: RemoteInterpreterContext interpreterContext);
   void shutdown();
 
-  string getStatus(1: string sessionKey, 2:string jobId);
-
-  RemoteInterpreterEvent getEvent();
+  string getStatus(1: string sessionId, 2:string jobId);
 
-  // as a response, ZeppelinServer send list of resources to Interpreter process
-  void resourcePoolResponseGetAll(1: list<string> resources);
-  // as a response, ZeppelinServer send serialized value of resource
-  void resourceResponseGet(1: string resourceId, 2: binary object);
-  // as a response, ZeppelinServer send return object
-  void resourceResponseInvokeMethod(1: string invokeMessage, 2: binary object);
-  // get all resources in the interpreter process
   list<string> resourcePoolGetAll();
   // get value of resource
-  binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string resourceName);
+  binary resourceGet(1: string sessionId, 2: string paragraphId, 3: string resourceName);
   // remove resource
-  bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string resourceName);
+  bool resourceRemove(1: string sessionId, 2: string paragraphId, 3:string resourceName);
   // invoke method on resource
-  binary resourceInvokeMethod(1: string sessionKey, 2: string paragraphId, 3:string resourceName, 4:string invokeMessage);
+  binary resourceInvokeMethod(1: string sessionId, 2: string paragraphId, 3:string resourceName, 4:string invokeMessage);
 
-  void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string
-  object);
-  void angularObjectAdd(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string object);
-  void angularObjectRemove(1: string name, 2: string sessionKey, 3: string paragraphId);
+  void angularObjectUpdate(1: string name, 2: string sessionId, 3: string paragraphId, 4: string object);
+  void angularObjectAdd(1: string name, 2: string sessionId, 3: string paragraphId, 4: string object);
+  void angularObjectRemove(1: string name, 2: string sessionId, 3: string paragraphId);
   void angularRegistryPush(1: string registry);
 
-  RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
+  RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionId, 4: string paragraphId);
   RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
   RemoteApplicationResult runApplication(1: string applicationInstanceId);
 
   void onReceivedZeppelinResource(1: string object);
 }
 
-service RemoteInterpreterCallbackService {
-  void callback(1: CallbackInfo callbackInfo);
-}
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/thrift/genthrift.sh
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/genthrift.sh b/zeppelin-interpreter/src/main/thrift/genthrift.sh
index 12c0521..23a295a 100755
--- a/zeppelin-interpreter/src/main/thrift/genthrift.sh
+++ b/zeppelin-interpreter/src/main/thrift/genthrift.sh
@@ -20,6 +20,7 @@
 rm -rf gen-java
 rm -rf ../java/org/apache/zeppelin/interpreter/thrift
 thrift --gen java RemoteInterpreterService.thrift
+thrift --gen java RemoteInterpreterEventService.thrift
 for file in gen-java/org/apache/zeppelin/interpreter/thrift/* ; do
   cat java_license_header.txt ${file} > ${file}.tmp
   mv -f ${file}.tmp ${file}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
index aefc484..529284f 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.display;
 
+import org.apache.thrift.TException;
 import org.junit.Test;
 
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +28,7 @@ import static org.junit.Assert.assertNull;
 public class AngularObjectRegistryTest {
 
   @Test
-  public void testBasic() {
+  public void testBasic() throws TException {
     final AtomicInteger onAdd = new AtomicInteger(0);
     final AtomicInteger onUpdate = new AtomicInteger(0);
     final AtomicInteger onRemove = new AtomicInteger(0);
@@ -79,7 +80,7 @@ public class AngularObjectRegistryTest {
   }
 
   @Test
-  public void testGetDependOnScope() {
+  public void testGetDependOnScope() throws TException {
     AngularObjectRegistry registry = new AngularObjectRegistry("intpId", null);
     AngularObject ao1 = registry.add("name1", "o1", "noteId1", "paragraphId1");
     AngularObject ao2 = registry.add("name2", "o2", "noteId1", "paragraphId1");
@@ -98,7 +99,7 @@ public class AngularObjectRegistryTest {
   }
 
   @Test
-  public void testGetAllDependOnScope() {
+  public void testGetAllDependOnScope() throws TException {
     AngularObjectRegistry registry = new AngularObjectRegistry("intpId", null);
     AngularObject ao1 = registry.add("name1", "o", "noteId1", "paragraphId1");
     AngularObject ao2 = registry.add("name2", "o", "noteId1", "paragraphId1");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
index 0196526..b30439a 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.display;
 
+import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.junit.Test;
 
@@ -78,7 +79,7 @@ public class AngularObjectTest {
   }
 
   @Test
-  public void testListener() {
+  public void testListener() throws TException {
     final AtomicInteger updated = new AtomicInteger(0);
     AngularObject ao = new AngularObject("name", "value", "note1", null,
         new AngularObjectListener() {
@@ -103,7 +104,7 @@ public class AngularObjectTest {
   }
 
   @Test
-  public void testWatcher() throws InterruptedException {
+  public void testWatcher() throws InterruptedException, TException {
     final AtomicInteger updated = new AtomicInteger(0);
     final AtomicInteger onWatch = new AtomicInteger(0);
     AngularObject ao = new AngularObject("name", "value", "note1", null,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 8ad4841..4888f5e 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -28,8 +28,8 @@ public class InterpreterContextTest {
   public void testThreadLocal() {
     assertNull(InterpreterContext.get());
 
-    InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null,
-        null, null, null, null, null));
+    InterpreterContext.set(InterpreterContext.builder()
+        .build());
     assertNotNull(InterpreterContext.get());
 
     InterpreterContext.remove();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
index 1debe71..72b9f58 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.zeppelin.interpreter;
 
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -59,19 +58,14 @@ public class InterpreterTest {
     String paragraphText = "testParagraphText";
     String paragraphId = "testParagraphId";
     String user = "username";
-    InterpreterContext.set(new InterpreterContext(noteId,
-        paragraphId,
-        null,
-        paragraphTitle,
-        paragraphText,
-        new AuthenticationInfo("testUser", null, "testTicket"),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null));
+    InterpreterContext.set(
+        InterpreterContext.builder()
+            .setNoteId(noteId)
+            .setParagraphId(paragraphId)
+            .setParagraphText(paragraphText)
+            .setParagraphTitle(paragraphTitle)
+            .build());
+
     Properties p = new Properties();
     p.put("p1", "replName #{noteId}, #{paragraphTitle}, #{paragraphId}, #{paragraphText}, " +
         "#{replName}, #{noteId}, #{user}," +

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
index 529fcfb..417b72c 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
@@ -37,9 +37,7 @@ public class LazyOpenInterpreterTest {
     LazyOpenInterpreter lazyOpenInterpreter = new LazyOpenInterpreter(interpreter);
 
     assertFalse("Interpreter is not open", lazyOpenInterpreter.isOpen());
-    InterpreterContext interpreterContext =
-        new InterpreterContext("note", "id", null, "title", "text", null, null, null,
-            null, null, null, null, null);
+    InterpreterContext interpreterContext = mock(InterpreterContext.class);
     lazyOpenInterpreter.interpret("intp 1", interpreterContext);
     assertTrue("Interpeter is open", lazyOpenInterpreter.isOpen());
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
index cf8daa3..14b4b6b 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
@@ -19,9 +19,8 @@ package org.apache.zeppelin.interpreter;
 
 import org.apache.zeppelin.resource.LocalResourcePool;
 import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.Before;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Properties;
@@ -72,19 +71,12 @@ public class ZeppCtxtVariableTest {
 
     resourcePool = new LocalResourcePool("ZeppelinContextVariableInterpolationTest");
 
-    InterpreterContext.set(new InterpreterContext("InterpolationTestNoteId",
-            "InterpolationTestParagraphTitle",
-            null,
-            "InterpolationTestParagraphTitle",
-            "InterpolationTestParagraphText",
-            new AuthenticationInfo("InterpolationTestUser", null, "testTicket"),
-            null,
-            null,
-            null,
-            null,
-            resourcePool,
-            null,
-            null));
+    InterpreterContext context = InterpreterContext.builder()
+        .setNoteId("noteId")
+        .setParagraphId("paragraphId")
+        .setResourcePool(resourcePool)
+        .build();
+    InterpreterContext.set(context);
 
     interpreter = new TestInterpreter(new Properties());
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 2ae1362..9719717 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -36,13 +36,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class RemoteInterpreterServerTest {
 
   @Test
   public void testStartStop() throws InterruptedException, IOException, TException {
     RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
-        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
 
     startRemoteInterpreterServer(server, 10 * 1000);
     stopRemoteInterpreterServer(server, 10 * 10000);
@@ -51,11 +52,11 @@ public class RemoteInterpreterServerTest {
   @Test
   public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
     RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
-        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
-
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
+    server.intpEventClient = mock(RemoteInterpreterEventClient.class);
     startRemoteInterpreterServer(server, 10 * 1000);
-    //just send an event on the client queue
-    server.eventClient.onAppStatusUpdate("", "", "", "");
+
+    server.intpEventClient.onAppStatusUpdate("", "", "", "");
     stopRemoteInterpreterServer(server, 10 * 10000);
   }
 
@@ -94,7 +95,8 @@ public class RemoteInterpreterServerTest {
   @Test
   public void testInterpreter() throws IOException, TException, InterruptedException {
     final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
-        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+        RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
+    server.intpEventClient = mock(RemoteInterpreterEventClient.class);
 
     Map<String, String> intpProperties = new HashMap<>();
     intpProperties.put("property_1", "value_1");
@@ -104,24 +106,24 @@ public class RemoteInterpreterServerTest {
     server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
         intpProperties, "user_1");
     Test1Interpreter interpreter1 = (Test1Interpreter)
-        ((LazyOpenInterpreter) server.interpreterGroup.get("session_1").get(0))
+        ((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0))
             .getInnerInterpreter();
-    assertEquals(1, server.interpreterGroup.getSessionNum());
-    assertEquals(1, server.interpreterGroup.get("session_1").size());
+    assertEquals(1, server.getInterpreterGroup().getSessionNum());
+    assertEquals(1, server.getInterpreterGroup().get("session_1").size());
     assertEquals(2, interpreter1.getProperties().size());
     assertEquals("value_1", interpreter1.getProperty("property_1"));
 
     // create Test2Interpreter in session_1
     server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
         intpProperties, "user_1");
-    assertEquals(2, server.interpreterGroup.get("session_1").size());
+    assertEquals(2, server.getInterpreterGroup().get("session_1").size());
 
     // create Test1Interpreter in session_2
     server.createInterpreter("group_1", "session_2", Test1Interpreter.class.getName(),
         intpProperties, "user_1");
-    assertEquals(2, server.interpreterGroup.getSessionNum());
-    assertEquals(2, server.interpreterGroup.get("session_1").size());
-    assertEquals(1, server.interpreterGroup.get("session_2").size());
+    assertEquals(2, server.getInterpreterGroup().getSessionNum());
+    assertEquals(2, server.getInterpreterGroup().get("session_1").size());
+    assertEquals(1, server.getInterpreterGroup().get("session_2").size());
 
     final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
     intpContext.setNoteId("note_1");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 4a823d9..43f909a 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -22,40 +22,7 @@ import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
-
 import org.apache.commons.lang.StringUtils;
-import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
-import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.quartz.SchedulerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.servlet.http.HttpServletRequest;
-
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObject;
@@ -66,7 +33,6 @@ import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.helium.HeliumPackage;
 import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -88,7 +54,6 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision
 import org.apache.zeppelin.notebook.socket.Message;
 import org.apache.zeppelin.notebook.socket.Message.OP;
 import org.apache.zeppelin.notebook.socket.WatcherMessage;
-import org.apache.zeppelin.rest.exception.ForbiddenException;
 import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
@@ -99,8 +64,42 @@ import org.apache.zeppelin.util.WatcherSecurityKey;
 import org.apache.zeppelin.utils.InterpreterBindingUtils;
 import org.apache.zeppelin.utils.SecurityUtils;
 import org.bitbucket.cowwoc.diffmatchpatch.DiffMatchPatch;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-/** Zeppelin websocket service. */
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Zeppelin websocket service.
+ */
 public class NotebookServer extends WebSocketServlet
     implements NotebookSocketListener,
         JobListenerFactory,
@@ -141,6 +140,8 @@ public class NotebookServer extends WebSocketServlet
   final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
   final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
 
+  private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
   /**
    * This is a special endpoint in the notebook websoket, Every connection in this Queue
    * will be able to watch every websocket event, it doesnt need to be listed into the map of
@@ -2183,71 +2184,61 @@ public class NotebookServer extends WebSocketServlet
     broadcast(noteId, msg);
   }
 
-  @Override
-  public void onGetParagraphRunners(String noteId, String paragraphId,
-      RemoteWorksEventListener callback) {
-    Notebook notebookIns = notebook();
-    List<InterpreterContextRunner> runner = new LinkedList<>();
 
-    if (notebookIns == null) {
-      LOG.info("intepreter request notebook instance is null");
-      callback.onFinished(notebookIns);
+  @Override
+  public void runParagraphs(String noteId,
+                            List<Integer> paragraphIndices,
+                            List<String> paragraphIds,
+                            String curParagraphId) throws IOException {
+    Notebook notebook = notebook();
+    final Note note = notebook.getNote(noteId);
+    final List<String> toBeRunParagraphIds = new ArrayList<>();
+    if (note == null) {
+      throw new IOException("Not existed noteId: " + noteId);
     }
-
-    try {
-      Note note = notebookIns.getNote(noteId);
-      if (note != null) {
-        if (paragraphId != null) {
-          Paragraph paragraph = note.getParagraph(paragraphId);
-          if (paragraph != null) {
-            runner.add(paragraph.getInterpreterContextRunner());
-          }
-        } else {
-          for (Paragraph p : note.getParagraphs()) {
-            runner.add(p.getInterpreterContextRunner());
-          }
+    if (!paragraphIds.isEmpty() && !paragraphIndices.isEmpty()) {
+      throw new IOException("Can not specify paragraphIds and paragraphIndices together");
+    }
+    if (paragraphIds != null && !paragraphIds.isEmpty()) {
+      for (String paragraphId : paragraphIds) {
+        if (note.getParagraph(paragraphId) == null) {
+          throw new IOException("Not existed paragraphId: " + paragraphId);
+        }
+        if (!paragraphId.equals(curParagraphId)) {
+          toBeRunParagraphIds.add(paragraphId);
         }
       }
-      callback.onFinished(runner);
-    } catch (NullPointerException e) {
-      LOG.warn(e.getMessage());
-      callback.onError();
     }
-  }
-
-  @Override
-  public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception {
-    Notebook notebookIns = notebook();
-    try {
-      if (notebookIns == null) {
-        throw new Exception("onRemoteRunParagraph notebook instance is null");
-      }
-      Note noteIns = notebookIns.getNote(noteId);
-      if (noteIns == null) {
-        throw new Exception(String.format("Can't found note id %s", noteId));
-      }
-
-      Paragraph paragraph = noteIns.getParagraph(paragraphId);
-      if (paragraph == null) {
-        throw new Exception(String.format("Can't found paragraph %s %s", noteId, paragraphId));
+    if (paragraphIndices != null && !paragraphIndices.isEmpty()) {
+      for (int paragraphIndex : paragraphIndices) {
+        if (note.getParagraph(paragraphIndex) == null) {
+          throw new IOException("Not existed paragraphIndex: " + paragraphIndex);
+        }
+        if (!note.getParagraph(paragraphIndex).getId().equals(curParagraphId)) {
+          toBeRunParagraphIds.add(note.getParagraph(paragraphIndex).getId());
+        }
       }
-
-      Set<String> userAndRoles = Sets.newHashSet();
-      userAndRoles.add(SecurityUtils.getPrincipal());
-      userAndRoles.addAll(SecurityUtils.getRoles());
-      if (!notebookIns.getNotebookAuthorization().hasRunAuthorization(userAndRoles, noteId)) {
-        throw new ForbiddenException(String.format("can't execute note %s", noteId));
+    }
+    // run the whole note except the current paragraph
+    if (paragraphIds.isEmpty() && paragraphIndices.isEmpty()) {
+      for (Paragraph paragraph : note.getParagraphs()) {
+        if (!paragraph.getId().equals(curParagraphId)) {
+          toBeRunParagraphIds.add(paragraph.getId());
+        }
       }
-
-      AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
-      paragraph.setAuthenticationInfo(subject);
-
-      noteIns.run(paragraphId);
-    } catch (Exception e) {
-      throw e;
     }
+    Runnable runThread = new Runnable() {
+      @Override
+      public void run() {
+        for (String paragraphId : toBeRunParagraphIds) {
+          note.run(paragraphId, true);
+        }
+      }
+    };
+    executorService.submit(runThread);
   }
 
+
   /**
    * Notebook Information Change event.
    */

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties
index 2bf5837..2364998 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -42,3 +42,5 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 log4j.logger.org.apache.hadoop=WARN
+
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 4b44b6f..5bfd69c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -42,7 +42,6 @@ import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.slf4j.Logger;
@@ -134,13 +133,9 @@ public class InterpreterSetting {
   // launcher in future when we have other launcher implementation. e.g. third party launcher
   // service like livy
   private transient InterpreterLauncher launcher;
-
   private transient LifecycleManager lifecycleManager;
-  ///////////////////////////////////////////////////////////////////////////////////////////
-
-
-
   private transient RecoveryStorage recoveryStorage;
+  private transient RemoteInterpreterEventServer interpreterEventServer;
   ///////////////////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -219,6 +214,11 @@ public class InterpreterSetting {
       return this;
     }
 
+    public Builder setRemoteInterpreterEventServer(RemoteInterpreterEventServer interpreterEventServer) {
+      interpreterSetting.interpreterEventServer = interpreterEventServer;
+      return this;
+    }
+
     public Builder setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener
                                                        remoteInterpreterProcessListener) {
       interpreterSetting.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
@@ -361,6 +361,12 @@ public class InterpreterSetting {
     return this;
   }
 
+  public InterpreterSetting setInterpreterEventServer(
+      RemoteInterpreterEventServer interpreterEventServer) {
+    this.interpreterEventServer = interpreterEventServer;
+    return this;
+  }
+
   public RecoveryStorage getRecoveryStorage() {
     return recoveryStorage;
   }
@@ -713,10 +719,8 @@ public class InterpreterSetting {
     }
     InterpreterLaunchContext launchContext = new
         InterpreterLaunchContext(properties, option, interpreterRunner, userName,
-        interpreterGroupId, id, group, name);
+        interpreterGroupId, id, group, name, interpreterEventServer.getPort(), interpreterEventServer.getHost());
     RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
-    process.setRemoteInterpreterEventPoller(
-        new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
     recoveryStorage.onInterpreterClientStart(process);
     return process;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 4d86588..d81e490 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -123,6 +123,7 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
   private LifecycleManager lifecycleManager;
   private RecoveryStorage recoveryStorage;
   private ConfigStorage configStorage;
+  private RemoteInterpreterEventServer interpreterEventServer;
 
   public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
                                    AngularObjectRegistryListener angularObjectRegistryListener,
@@ -173,7 +174,8 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
     LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName());
 
     this.configStorage = configStorage;
-
+    this.interpreterEventServer = new RemoteInterpreterEventServer(conf, this);
+    this.interpreterEventServer.start();
     init();
   }
 
@@ -203,6 +205,7 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
         .setDependencyResolver(dependencyResolver)
         .setLifecycleManager(lifecycleManager)
         .setRecoveryStorage(recoveryStorage)
+        .setInterpreterEventServer(interpreterEventServer)
         .postProcessing();
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index ecbaf16..b6b6b69 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -63,10 +63,6 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
           properties);
       remoteInterpreterProcess.start(userName);
       interpreterSetting.getLifecycleManager().onInterpreterProcessStarted(this);
-      remoteInterpreterProcess.getRemoteInterpreterEventPoller()
-          .setInterpreterProcess(remoteInterpreterProcess);
-      remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this);
-      remoteInterpreterProcess.getRemoteInterpreterEventPoller().start();
       getInterpreterSetting().getRecoveryStorage()
           .onInterpreterClientStart(remoteInterpreterProcess);
     }