You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/15 01:09:58 UTC
[03/13] zeppelin git commit: ZEPPELIN-2035. BI directional RPC
framework between ZeppelinServer and InterpreterProcess on top of thrift
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
new file mode 100644
index 0000000..308364f
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java
@@ -0,0 +1,828 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
+public class RunParagraphsEvent implements org.apache.thrift.TBase<RunParagraphsEvent, RunParagraphsEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RunParagraphsEvent> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RunParagraphsEvent");
+
+ private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField PARAGRAPH_IDS_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphIds", org.apache.thrift.protocol.TType.LIST, (short)2);
+ private static final org.apache.thrift.protocol.TField PARAGRAPH_INDICES_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphIndices", org.apache.thrift.protocol.TType.LIST, (short)3);
+ private static final org.apache.thrift.protocol.TField CUR_PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("curParagraphId", org.apache.thrift.protocol.TType.STRING, (short)4);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new RunParagraphsEventStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new RunParagraphsEventTupleSchemeFactory());
+ }
+
+ public String noteId; // required
+ public List<String> paragraphIds; // required
+ public List<Integer> paragraphIndices; // required
+ public String curParagraphId; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ NOTE_ID((short)1, "noteId"),
+ PARAGRAPH_IDS((short)2, "paragraphIds"),
+ PARAGRAPH_INDICES((short)3, "paragraphIndices"),
+ CUR_PARAGRAPH_ID((short)4, "curParagraphId");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // NOTE_ID
+ return NOTE_ID;
+ case 2: // PARAGRAPH_IDS
+ return PARAGRAPH_IDS;
+ case 3: // PARAGRAPH_INDICES
+ return PARAGRAPH_INDICES;
+ case 4: // CUR_PARAGRAPH_ID
+ return CUR_PARAGRAPH_ID;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.PARAGRAPH_IDS, new org.apache.thrift.meta_data.FieldMetaData("paragraphIds", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.PARAGRAPH_INDICES, new org.apache.thrift.meta_data.FieldMetaData("paragraphIndices", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))));
+ tmpMap.put(_Fields.CUR_PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("curParagraphId", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RunParagraphsEvent.class, metaDataMap);
+ }
+
+ public RunParagraphsEvent() {
+ }
+
+ public RunParagraphsEvent(
+ String noteId,
+ List<String> paragraphIds,
+ List<Integer> paragraphIndices,
+ String curParagraphId)
+ {
+ this();
+ this.noteId = noteId;
+ this.paragraphIds = paragraphIds;
+ this.paragraphIndices = paragraphIndices;
+ this.curParagraphId = curParagraphId;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public RunParagraphsEvent(RunParagraphsEvent other) {
+ if (other.isSetNoteId()) {
+ this.noteId = other.noteId;
+ }
+ if (other.isSetParagraphIds()) {
+ List<String> __this__paragraphIds = new ArrayList<String>(other.paragraphIds);
+ this.paragraphIds = __this__paragraphIds;
+ }
+ if (other.isSetParagraphIndices()) {
+ List<Integer> __this__paragraphIndices = new ArrayList<Integer>(other.paragraphIndices);
+ this.paragraphIndices = __this__paragraphIndices;
+ }
+ if (other.isSetCurParagraphId()) {
+ this.curParagraphId = other.curParagraphId;
+ }
+ }
+
+ public RunParagraphsEvent deepCopy() {
+ return new RunParagraphsEvent(this);
+ }
+
+ @Override
+ public void clear() {
+ this.noteId = null;
+ this.paragraphIds = null;
+ this.paragraphIndices = null;
+ this.curParagraphId = null;
+ }
+
+ public String getNoteId() {
+ return this.noteId;
+ }
+
+ public RunParagraphsEvent setNoteId(String noteId) {
+ this.noteId = noteId;
+ return this;
+ }
+
+ public void unsetNoteId() {
+ this.noteId = null;
+ }
+
+ /** Returns true if field noteId is set (has been assigned a value) and false otherwise */
+ public boolean isSetNoteId() {
+ return this.noteId != null;
+ }
+
+ public void setNoteIdIsSet(boolean value) {
+ if (!value) {
+ this.noteId = null;
+ }
+ }
+
+ public int getParagraphIdsSize() {
+ return (this.paragraphIds == null) ? 0 : this.paragraphIds.size();
+ }
+
+ public java.util.Iterator<String> getParagraphIdsIterator() {
+ return (this.paragraphIds == null) ? null : this.paragraphIds.iterator();
+ }
+
+ public void addToParagraphIds(String elem) {
+ if (this.paragraphIds == null) {
+ this.paragraphIds = new ArrayList<String>();
+ }
+ this.paragraphIds.add(elem);
+ }
+
+ public List<String> getParagraphIds() {
+ return this.paragraphIds;
+ }
+
+ public RunParagraphsEvent setParagraphIds(List<String> paragraphIds) {
+ this.paragraphIds = paragraphIds;
+ return this;
+ }
+
+ public void unsetParagraphIds() {
+ this.paragraphIds = null;
+ }
+
+ /** Returns true if field paragraphIds is set (has been assigned a value) and false otherwise */
+ public boolean isSetParagraphIds() {
+ return this.paragraphIds != null;
+ }
+
+ public void setParagraphIdsIsSet(boolean value) {
+ if (!value) {
+ this.paragraphIds = null;
+ }
+ }
+
+ public int getParagraphIndicesSize() {
+ return (this.paragraphIndices == null) ? 0 : this.paragraphIndices.size();
+ }
+
+ public java.util.Iterator<Integer> getParagraphIndicesIterator() {
+ return (this.paragraphIndices == null) ? null : this.paragraphIndices.iterator();
+ }
+
+ public void addToParagraphIndices(int elem) {
+ if (this.paragraphIndices == null) {
+ this.paragraphIndices = new ArrayList<Integer>();
+ }
+ this.paragraphIndices.add(elem);
+ }
+
+ public List<Integer> getParagraphIndices() {
+ return this.paragraphIndices;
+ }
+
+ public RunParagraphsEvent setParagraphIndices(List<Integer> paragraphIndices) {
+ this.paragraphIndices = paragraphIndices;
+ return this;
+ }
+
+ public void unsetParagraphIndices() {
+ this.paragraphIndices = null;
+ }
+
+ /** Returns true if field paragraphIndices is set (has been assigned a value) and false otherwise */
+ public boolean isSetParagraphIndices() {
+ return this.paragraphIndices != null;
+ }
+
+ public void setParagraphIndicesIsSet(boolean value) {
+ if (!value) {
+ this.paragraphIndices = null;
+ }
+ }
+
+ public String getCurParagraphId() {
+ return this.curParagraphId;
+ }
+
+ public RunParagraphsEvent setCurParagraphId(String curParagraphId) {
+ this.curParagraphId = curParagraphId;
+ return this;
+ }
+
+ public void unsetCurParagraphId() {
+ this.curParagraphId = null;
+ }
+
+ /** Returns true if field curParagraphId is set (has been assigned a value) and false otherwise */
+ public boolean isSetCurParagraphId() {
+ return this.curParagraphId != null;
+ }
+
+ public void setCurParagraphIdIsSet(boolean value) {
+ if (!value) {
+ this.curParagraphId = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case NOTE_ID:
+ if (value == null) {
+ unsetNoteId();
+ } else {
+ setNoteId((String)value);
+ }
+ break;
+
+ case PARAGRAPH_IDS:
+ if (value == null) {
+ unsetParagraphIds();
+ } else {
+ setParagraphIds((List<String>)value);
+ }
+ break;
+
+ case PARAGRAPH_INDICES:
+ if (value == null) {
+ unsetParagraphIndices();
+ } else {
+ setParagraphIndices((List<Integer>)value);
+ }
+ break;
+
+ case CUR_PARAGRAPH_ID:
+ if (value == null) {
+ unsetCurParagraphId();
+ } else {
+ setCurParagraphId((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case NOTE_ID:
+ return getNoteId();
+
+ case PARAGRAPH_IDS:
+ return getParagraphIds();
+
+ case PARAGRAPH_INDICES:
+ return getParagraphIndices();
+
+ case CUR_PARAGRAPH_ID:
+ return getCurParagraphId();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case NOTE_ID:
+ return isSetNoteId();
+ case PARAGRAPH_IDS:
+ return isSetParagraphIds();
+ case PARAGRAPH_INDICES:
+ return isSetParagraphIndices();
+ case CUR_PARAGRAPH_ID:
+ return isSetCurParagraphId();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof RunParagraphsEvent)
+ return this.equals((RunParagraphsEvent)that);
+ return false;
+ }
+
+ public boolean equals(RunParagraphsEvent that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_noteId = true && this.isSetNoteId();
+ boolean that_present_noteId = true && that.isSetNoteId();
+ if (this_present_noteId || that_present_noteId) {
+ if (!(this_present_noteId && that_present_noteId))
+ return false;
+ if (!this.noteId.equals(that.noteId))
+ return false;
+ }
+
+ boolean this_present_paragraphIds = true && this.isSetParagraphIds();
+ boolean that_present_paragraphIds = true && that.isSetParagraphIds();
+ if (this_present_paragraphIds || that_present_paragraphIds) {
+ if (!(this_present_paragraphIds && that_present_paragraphIds))
+ return false;
+ if (!this.paragraphIds.equals(that.paragraphIds))
+ return false;
+ }
+
+ boolean this_present_paragraphIndices = true && this.isSetParagraphIndices();
+ boolean that_present_paragraphIndices = true && that.isSetParagraphIndices();
+ if (this_present_paragraphIndices || that_present_paragraphIndices) {
+ if (!(this_present_paragraphIndices && that_present_paragraphIndices))
+ return false;
+ if (!this.paragraphIndices.equals(that.paragraphIndices))
+ return false;
+ }
+
+ boolean this_present_curParagraphId = true && this.isSetCurParagraphId();
+ boolean that_present_curParagraphId = true && that.isSetCurParagraphId();
+ if (this_present_curParagraphId || that_present_curParagraphId) {
+ if (!(this_present_curParagraphId && that_present_curParagraphId))
+ return false;
+ if (!this.curParagraphId.equals(that.curParagraphId))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_noteId = true && (isSetNoteId());
+ list.add(present_noteId);
+ if (present_noteId)
+ list.add(noteId);
+
+ boolean present_paragraphIds = true && (isSetParagraphIds());
+ list.add(present_paragraphIds);
+ if (present_paragraphIds)
+ list.add(paragraphIds);
+
+ boolean present_paragraphIndices = true && (isSetParagraphIndices());
+ list.add(present_paragraphIndices);
+ if (present_paragraphIndices)
+ list.add(paragraphIndices);
+
+ boolean present_curParagraphId = true && (isSetCurParagraphId());
+ list.add(present_curParagraphId);
+ if (present_curParagraphId)
+ list.add(curParagraphId);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(RunParagraphsEvent other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetNoteId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetParagraphIds()).compareTo(other.isSetParagraphIds());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetParagraphIds()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphIds, other.paragraphIds);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetParagraphIndices()).compareTo(other.isSetParagraphIndices());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetParagraphIndices()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphIndices, other.paragraphIndices);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetCurParagraphId()).compareTo(other.isSetCurParagraphId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCurParagraphId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.curParagraphId, other.curParagraphId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("RunParagraphsEvent(");
+ boolean first = true;
+
+ sb.append("noteId:");
+ if (this.noteId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.noteId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("paragraphIds:");
+ if (this.paragraphIds == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.paragraphIds);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("paragraphIndices:");
+ if (this.paragraphIndices == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.paragraphIndices);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("curParagraphId:");
+ if (this.curParagraphId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.curParagraphId);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class RunParagraphsEventStandardSchemeFactory implements SchemeFactory {
+ public RunParagraphsEventStandardScheme getScheme() {
+ return new RunParagraphsEventStandardScheme();
+ }
+ }
+
+ private static class RunParagraphsEventStandardScheme extends StandardScheme<RunParagraphsEvent> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // NOTE_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.noteId = iprot.readString();
+ struct.setNoteIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // PARAGRAPH_IDS
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list8 = iprot.readListBegin();
+ struct.paragraphIds = new ArrayList<String>(_list8.size);
+ String _elem9;
+ for (int _i10 = 0; _i10 < _list8.size; ++_i10)
+ {
+ _elem9 = iprot.readString();
+ struct.paragraphIds.add(_elem9);
+ }
+ iprot.readListEnd();
+ }
+ struct.setParagraphIdsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // PARAGRAPH_INDICES
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list11 = iprot.readListBegin();
+ struct.paragraphIndices = new ArrayList<Integer>(_list11.size);
+ int _elem12;
+ for (int _i13 = 0; _i13 < _list11.size; ++_i13)
+ {
+ _elem12 = iprot.readI32();
+ struct.paragraphIndices.add(_elem12);
+ }
+ iprot.readListEnd();
+ }
+ struct.setParagraphIndicesIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 4: // CUR_PARAGRAPH_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.curParagraphId = iprot.readString();
+ struct.setCurParagraphIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.noteId != null) {
+ oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
+ oprot.writeString(struct.noteId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.paragraphIds != null) {
+ oprot.writeFieldBegin(PARAGRAPH_IDS_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.paragraphIds.size()));
+ for (String _iter14 : struct.paragraphIds)
+ {
+ oprot.writeString(_iter14);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ if (struct.paragraphIndices != null) {
+ oprot.writeFieldBegin(PARAGRAPH_INDICES_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, struct.paragraphIndices.size()));
+ for (int _iter15 : struct.paragraphIndices)
+ {
+ oprot.writeI32(_iter15);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ if (struct.curParagraphId != null) {
+ oprot.writeFieldBegin(CUR_PARAGRAPH_ID_FIELD_DESC);
+ oprot.writeString(struct.curParagraphId);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class RunParagraphsEventTupleSchemeFactory implements SchemeFactory {
+ public RunParagraphsEventTupleScheme getScheme() {
+ return new RunParagraphsEventTupleScheme();
+ }
+ }
+
+ private static class RunParagraphsEventTupleScheme extends TupleScheme<RunParagraphsEvent> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetNoteId()) {
+ optionals.set(0);
+ }
+ if (struct.isSetParagraphIds()) {
+ optionals.set(1);
+ }
+ if (struct.isSetParagraphIndices()) {
+ optionals.set(2);
+ }
+ if (struct.isSetCurParagraphId()) {
+ optionals.set(3);
+ }
+ oprot.writeBitSet(optionals, 4);
+ if (struct.isSetNoteId()) {
+ oprot.writeString(struct.noteId);
+ }
+ if (struct.isSetParagraphIds()) {
+ {
+ oprot.writeI32(struct.paragraphIds.size());
+ for (String _iter16 : struct.paragraphIds)
+ {
+ oprot.writeString(_iter16);
+ }
+ }
+ }
+ if (struct.isSetParagraphIndices()) {
+ {
+ oprot.writeI32(struct.paragraphIndices.size());
+ for (int _iter17 : struct.paragraphIndices)
+ {
+ oprot.writeI32(_iter17);
+ }
+ }
+ }
+ if (struct.isSetCurParagraphId()) {
+ oprot.writeString(struct.curParagraphId);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, RunParagraphsEvent struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(4);
+ if (incoming.get(0)) {
+ struct.noteId = iprot.readString();
+ struct.setNoteIdIsSet(true);
+ }
+ if (incoming.get(1)) {
+ {
+ org.apache.thrift.protocol.TList _list18 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.paragraphIds = new ArrayList<String>(_list18.size);
+ String _elem19;
+ for (int _i20 = 0; _i20 < _list18.size; ++_i20)
+ {
+ _elem19 = iprot.readString();
+ struct.paragraphIds.add(_elem19);
+ }
+ }
+ struct.setParagraphIdsIsSet(true);
+ }
+ if (incoming.get(2)) {
+ {
+ org.apache.thrift.protocol.TList _list21 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I32, iprot.readI32());
+ struct.paragraphIndices = new ArrayList<Integer>(_list21.size);
+ int _elem22;
+ for (int _i23 = 0; _i23 < _list21.size; ++_i23)
+ {
+ _elem22 = iprot.readI32();
+ struct.paragraphIndices.add(_elem22);
+ }
+ }
+ struct.setParagraphIndicesIsSet(true);
+ }
+ if (incoming.get(3)) {
+ struct.curParagraphId = iprot.readString();
+ struct.setCurParagraphIdIsSet(true);
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
index a537d02..f31c69e 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ZeppelinServerResourceParagraphRunner.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
/**
- * Autogenerated by Thrift Compiler (0.9.3)
+ * Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2018-05-22")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
new file mode 100644
index 0000000..6d245c5
--- /dev/null
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+include "RemoteInterpreterService.thrift"
+
+namespace java org.apache.zeppelin.interpreter.thrift
+
+struct RegisterInfo {
+ 1: string host,
+ 2: i32 port
+ 3: string interpreterGroupId
+}
+
+struct OutputAppendEvent {
+ 1: string noteId,
+ 2: string paragraphId,
+ 3: i32 index,
+ 4: string data,
+ 5: string appId
+}
+
+struct OutputUpdateEvent {
+ 1: string noteId,
+ 2: string paragraphId,
+ 3: i32 index,
+ 4: string type,
+ 5: string data,
+ 6: string appId
+}
+
+struct OutputUpdateAllEvent {
+ 1: string noteId,
+ 2: string paragraphId,
+ 3: list<RemoteInterpreterService.RemoteInterpreterResultMessage> msg,
+}
+
+struct RunParagraphsEvent {
+ 1: string noteId,
+ 2: list<string> paragraphIds,
+ 3: list<i32> paragraphIndices,
+ 4: string curParagraphId
+}
+
+struct AngularObjectId {
+ 1: string noteId,
+ 2: string paragraphId,
+ 3: string name
+}
+
+struct AppOutputAppendEvent {
+ 1: string noteId,
+ 2: string paragraphId,
+ 3: string appId,
+ 4: i32 index,
+ 5: string data
+}
+
+struct AppOutputUpdateEvent {
+ 1: string noteId,
+ 2: string paragraphId,
+ 3: string appId,
+ 4: i32 index,
+ 5: string type,
+ 6: string data
+}
+
+struct AppStatusUpdateEvent {
+ 1: string noteId,
+ 2: string paragraphId,
+ 3: string appId,
+ 4: string status
+}
+
+service RemoteInterpreterEventService {
+ void registerInterpreterProcess(1: RegisterInfo registerInfo);
+ void appendOutput(1: OutputAppendEvent event);
+ void updateOutput(1: OutputUpdateEvent event);
+ void updateAllOutput(1: OutputUpdateAllEvent event);
+
+ void appendAppOutput(1: AppOutputAppendEvent event);
+ void updateAppOutput(1: AppOutputUpdateEvent event);
+ void updateAppStatus(1: AppStatusUpdateEvent event);
+
+ void runParagraphs(1: RunParagraphsEvent event);
+
+ void addAngularObject(1: string intpGroupId, 2: string json);
+ void updateAngularObject(1: string intpGroupId, 2: string json);
+ void removeAngularObject(1: string intpGroupId, 2: string noteId, 3: string paragraphId, 4: string name);
+
+ void sendMetaInfo(1: string intpGroupId, 2: string json);
+ void sendParagraphInfo(1: string intpGroupId, 2: string json);
+
+ list<string> getAllResources(1: string intpGroupId);
+ binary getResource(1: string resourceIdJson);
+ binary invokeMethod(1: string intpGroupId, 2: string invokeMethodJson);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index 559648a..fcc14b0 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -28,7 +28,6 @@ struct RemoteInterpreterContext {
7: string config, // json serialized config
8: string gui, // json serialized gui
9: string noteGui, // json serialized note gui
- 10: string runners // json serialized runner
}
struct RemoteInterpreterResultMessage {
@@ -90,55 +89,40 @@ struct InterpreterCompletion {
3: string meta
}
-struct CallbackInfo {
- 1: string host,
- 2: i32 port
-}
+
service RemoteInterpreterService {
- void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties, 5: string userName);
- void open(1: string sessionKey, 2: string className);
- void close(1: string sessionKey, 2: string className);
- RemoteInterpreterResult interpret(1: string sessionKey, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
- void cancel(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
- i32 getProgress(1: string sessionKey, 2: string className, 3: RemoteInterpreterContext interpreterContext);
- string getFormType(1: string sessionKey, 2: string className);
- list<InterpreterCompletion> completion(1: string sessionKey, 2: string className, 3: string buf, 4: i32 cursor, 5: RemoteInterpreterContext interpreterContext);
+ void createInterpreter(1: string intpGroupId, 2: string sessionId, 3: string className, 4: map<string, string> properties, 5: string userName);
+ void open(1: string sessionId, 2: string className);
+ void close(1: string sessionId, 2: string className);
+ RemoteInterpreterResult interpret(1: string sessionId, 2: string className, 3: string st, 4: RemoteInterpreterContext interpreterContext);
+ void cancel(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
+ i32 getProgress(1: string sessionId, 2: string className, 3: RemoteInterpreterContext interpreterContext);
+ string getFormType(1: string sessionId, 2: string className);
+ list<InterpreterCompletion> completion(1: string sessionId, 2: string className, 3: string buf, 4: i32 cursor, 5: RemoteInterpreterContext interpreterContext);
void shutdown();
- string getStatus(1: string sessionKey, 2:string jobId);
-
- RemoteInterpreterEvent getEvent();
+ string getStatus(1: string sessionId, 2:string jobId);
- // as a response, ZeppelinServer send list of resources to Interpreter process
- void resourcePoolResponseGetAll(1: list<string> resources);
- // as a response, ZeppelinServer send serialized value of resource
- void resourceResponseGet(1: string resourceId, 2: binary object);
- // as a response, ZeppelinServer send return object
- void resourceResponseInvokeMethod(1: string invokeMessage, 2: binary object);
- // get all resources in the interpreter process
list<string> resourcePoolGetAll();
// get value of resource
- binary resourceGet(1: string sessionKey, 2: string paragraphId, 3: string resourceName);
+ binary resourceGet(1: string sessionId, 2: string paragraphId, 3: string resourceName);
// remove resource
- bool resourceRemove(1: string sessionKey, 2: string paragraphId, 3:string resourceName);
+ bool resourceRemove(1: string sessionId, 2: string paragraphId, 3:string resourceName);
// invoke method on resource
- binary resourceInvokeMethod(1: string sessionKey, 2: string paragraphId, 3:string resourceName, 4:string invokeMessage);
+ binary resourceInvokeMethod(1: string sessionId, 2: string paragraphId, 3:string resourceName, 4:string invokeMessage);
- void angularObjectUpdate(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string
- object);
- void angularObjectAdd(1: string name, 2: string sessionKey, 3: string paragraphId, 4: string object);
- void angularObjectRemove(1: string name, 2: string sessionKey, 3: string paragraphId);
+ void angularObjectUpdate(1: string name, 2: string sessionId, 3: string paragraphId, 4: string object);
+ void angularObjectAdd(1: string name, 2: string sessionId, 3: string paragraphId, 4: string object);
+ void angularObjectRemove(1: string name, 2: string sessionId, 3: string paragraphId);
void angularRegistryPush(1: string registry);
- RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionKey, 4: string paragraphId);
+ RemoteApplicationResult loadApplication(1: string applicationInstanceId, 2: string packageInfo, 3: string sessionId, 4: string paragraphId);
RemoteApplicationResult unloadApplication(1: string applicationInstanceId);
RemoteApplicationResult runApplication(1: string applicationInstanceId);
void onReceivedZeppelinResource(1: string object);
}
-service RemoteInterpreterCallbackService {
- void callback(1: CallbackInfo callbackInfo);
-}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/thrift/genthrift.sh
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/genthrift.sh b/zeppelin-interpreter/src/main/thrift/genthrift.sh
index 12c0521..23a295a 100755
--- a/zeppelin-interpreter/src/main/thrift/genthrift.sh
+++ b/zeppelin-interpreter/src/main/thrift/genthrift.sh
@@ -20,6 +20,7 @@
rm -rf gen-java
rm -rf ../java/org/apache/zeppelin/interpreter/thrift
thrift --gen java RemoteInterpreterService.thrift
+thrift --gen java RemoteInterpreterEventService.thrift
for file in gen-java/org/apache/zeppelin/interpreter/thrift/* ; do
cat java_license_header.txt ${file} > ${file}.tmp
mv -f ${file}.tmp ${file}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
index aefc484..529284f 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectRegistryTest.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.display;
+import org.apache.thrift.TException;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +28,7 @@ import static org.junit.Assert.assertNull;
public class AngularObjectRegistryTest {
@Test
- public void testBasic() {
+ public void testBasic() throws TException {
final AtomicInteger onAdd = new AtomicInteger(0);
final AtomicInteger onUpdate = new AtomicInteger(0);
final AtomicInteger onRemove = new AtomicInteger(0);
@@ -79,7 +80,7 @@ public class AngularObjectRegistryTest {
}
@Test
- public void testGetDependOnScope() {
+ public void testGetDependOnScope() throws TException {
AngularObjectRegistry registry = new AngularObjectRegistry("intpId", null);
AngularObject ao1 = registry.add("name1", "o1", "noteId1", "paragraphId1");
AngularObject ao2 = registry.add("name2", "o2", "noteId1", "paragraphId1");
@@ -98,7 +99,7 @@ public class AngularObjectRegistryTest {
}
@Test
- public void testGetAllDependOnScope() {
+ public void testGetAllDependOnScope() throws TException {
AngularObjectRegistry registry = new AngularObjectRegistry("intpId", null);
AngularObject ao1 = registry.add("name1", "o", "noteId1", "paragraphId1");
AngularObject ao2 = registry.add("name2", "o", "noteId1", "paragraphId1");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
index 0196526..b30439a 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/display/AngularObjectTest.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.display;
+import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.junit.Test;
@@ -78,7 +79,7 @@ public class AngularObjectTest {
}
@Test
- public void testListener() {
+ public void testListener() throws TException {
final AtomicInteger updated = new AtomicInteger(0);
AngularObject ao = new AngularObject("name", "value", "note1", null,
new AngularObjectListener() {
@@ -103,7 +104,7 @@ public class AngularObjectTest {
}
@Test
- public void testWatcher() throws InterruptedException {
+ public void testWatcher() throws InterruptedException, TException {
final AtomicInteger updated = new AtomicInteger(0);
final AtomicInteger onWatch = new AtomicInteger(0);
AngularObject ao = new AngularObject("name", "value", "note1", null,
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
index 8ad4841..4888f5e 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterContextTest.java
@@ -28,8 +28,8 @@ public class InterpreterContextTest {
public void testThreadLocal() {
assertNull(InterpreterContext.get());
- InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null,
- null, null, null, null, null));
+ InterpreterContext.set(InterpreterContext.builder()
+ .build());
assertNotNull(InterpreterContext.get());
InterpreterContext.remove();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
index 1debe71..72b9f58 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterTest.java
@@ -17,7 +17,6 @@
package org.apache.zeppelin.interpreter;
-import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Test;
import java.util.Properties;
@@ -59,19 +58,14 @@ public class InterpreterTest {
String paragraphText = "testParagraphText";
String paragraphId = "testParagraphId";
String user = "username";
- InterpreterContext.set(new InterpreterContext(noteId,
- paragraphId,
- null,
- paragraphTitle,
- paragraphText,
- new AuthenticationInfo("testUser", null, "testTicket"),
- null,
- null,
- null,
- null,
- null,
- null,
- null));
+ InterpreterContext.set(
+ InterpreterContext.builder()
+ .setNoteId(noteId)
+ .setParagraphId(paragraphId)
+ .setParagraphText(paragraphText)
+ .setParagraphTitle(paragraphTitle)
+ .build());
+
Properties p = new Properties();
p.put("p1", "replName #{noteId}, #{paragraphTitle}, #{paragraphId}, #{paragraphText}, " +
"#{replName}, #{noteId}, #{user}," +
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
index 529fcfb..417b72c 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/LazyOpenInterpreterTest.java
@@ -37,9 +37,7 @@ public class LazyOpenInterpreterTest {
LazyOpenInterpreter lazyOpenInterpreter = new LazyOpenInterpreter(interpreter);
assertFalse("Interpreter is not open", lazyOpenInterpreter.isOpen());
- InterpreterContext interpreterContext =
- new InterpreterContext("note", "id", null, "title", "text", null, null, null,
- null, null, null, null, null);
+ InterpreterContext interpreterContext = mock(InterpreterContext.class);
lazyOpenInterpreter.interpret("intp 1", interpreterContext);
assertTrue("Interpeter is open", lazyOpenInterpreter.isOpen());
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
index cf8daa3..14b4b6b 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/ZeppCtxtVariableTest.java
@@ -19,9 +19,8 @@ package org.apache.zeppelin.interpreter;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.resource.ResourcePool;
-import org.apache.zeppelin.user.AuthenticationInfo;
-import org.junit.Before;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
@@ -72,19 +71,12 @@ public class ZeppCtxtVariableTest {
resourcePool = new LocalResourcePool("ZeppelinContextVariableInterpolationTest");
- InterpreterContext.set(new InterpreterContext("InterpolationTestNoteId",
- "InterpolationTestParagraphTitle",
- null,
- "InterpolationTestParagraphTitle",
- "InterpolationTestParagraphText",
- new AuthenticationInfo("InterpolationTestUser", null, "testTicket"),
- null,
- null,
- null,
- null,
- resourcePool,
- null,
- null));
+ InterpreterContext context = InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setResourcePool(resourcePool)
+ .build();
+ InterpreterContext.set(context);
interpreter = new TestInterpreter(new Properties());
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index 2ae1362..9719717 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -36,13 +36,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
public class RemoteInterpreterServerTest {
@Test
public void testStartStop() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
startRemoteInterpreterServer(server, 10 * 1000);
stopRemoteInterpreterServer(server, 10 * 10000);
@@ -51,11 +52,11 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
-
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
+ server.intpEventClient = mock(RemoteInterpreterEventClient.class);
startRemoteInterpreterServer(server, 10 * 1000);
- //just send an event on the client queue
- server.eventClient.onAppStatusUpdate("", "", "", "");
+
+ server.intpEventClient.onAppStatusUpdate("", "", "", "");
stopRemoteInterpreterServer(server, 10 * 10000);
}
@@ -94,7 +95,8 @@ public class RemoteInterpreterServerTest {
@Test
public void testInterpreter() throws IOException, TException, InterruptedException {
final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true);
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
+ server.intpEventClient = mock(RemoteInterpreterEventClient.class);
Map<String, String> intpProperties = new HashMap<>();
intpProperties.put("property_1", "value_1");
@@ -104,24 +106,24 @@ public class RemoteInterpreterServerTest {
server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
intpProperties, "user_1");
Test1Interpreter interpreter1 = (Test1Interpreter)
- ((LazyOpenInterpreter) server.interpreterGroup.get("session_1").get(0))
+ ((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0))
.getInnerInterpreter();
- assertEquals(1, server.interpreterGroup.getSessionNum());
- assertEquals(1, server.interpreterGroup.get("session_1").size());
+ assertEquals(1, server.getInterpreterGroup().getSessionNum());
+ assertEquals(1, server.getInterpreterGroup().get("session_1").size());
assertEquals(2, interpreter1.getProperties().size());
assertEquals("value_1", interpreter1.getProperty("property_1"));
// create Test2Interpreter in session_1
server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
intpProperties, "user_1");
- assertEquals(2, server.interpreterGroup.get("session_1").size());
+ assertEquals(2, server.getInterpreterGroup().get("session_1").size());
// create Test1Interpreter in session_2
server.createInterpreter("group_1", "session_2", Test1Interpreter.class.getName(),
intpProperties, "user_1");
- assertEquals(2, server.interpreterGroup.getSessionNum());
- assertEquals(2, server.interpreterGroup.get("session_1").size());
- assertEquals(1, server.interpreterGroup.get("session_2").size());
+ assertEquals(2, server.getInterpreterGroup().getSessionNum());
+ assertEquals(2, server.getInterpreterGroup().get("session_1").size());
+ assertEquals(1, server.getInterpreterGroup().get("session_2").size());
final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
intpContext.setNoteId("note_1");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 4a823d9..43f909a 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -22,40 +22,7 @@ import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
-
import org.apache.commons.lang.StringUtils;
-import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
-import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.quartz.SchedulerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.servlet.http.HttpServletRequest;
-
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
@@ -66,7 +33,6 @@ import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -88,7 +54,6 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
import org.apache.zeppelin.notebook.socket.WatcherMessage;
-import org.apache.zeppelin.rest.exception.ForbiddenException;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.server.ZeppelinServer;
@@ -99,8 +64,42 @@ import org.apache.zeppelin.util.WatcherSecurityKey;
import org.apache.zeppelin.utils.InterpreterBindingUtils;
import org.apache.zeppelin.utils.SecurityUtils;
import org.bitbucket.cowwoc.diffmatchpatch.DiffMatchPatch;
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-/** Zeppelin websocket service. */
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Zeppelin websocket service.
+ */
public class NotebookServer extends WebSocketServlet
implements NotebookSocketListener,
JobListenerFactory,
@@ -141,6 +140,8 @@ public class NotebookServer extends WebSocketServlet
final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>();
final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>();
+ private ExecutorService executorService = Executors.newFixedThreadPool(10);
+
/**
* This is a special endpoint in the notebook websoket, Every connection in this Queue
* will be able to watch every websocket event, it doesnt need to be listed into the map of
@@ -2183,71 +2184,61 @@ public class NotebookServer extends WebSocketServlet
broadcast(noteId, msg);
}
- @Override
- public void onGetParagraphRunners(String noteId, String paragraphId,
- RemoteWorksEventListener callback) {
- Notebook notebookIns = notebook();
- List<InterpreterContextRunner> runner = new LinkedList<>();
- if (notebookIns == null) {
- LOG.info("intepreter request notebook instance is null");
- callback.onFinished(notebookIns);
+ @Override
+ public void runParagraphs(String noteId,
+ List<Integer> paragraphIndices,
+ List<String> paragraphIds,
+ String curParagraphId) throws IOException {
+ Notebook notebook = notebook();
+ final Note note = notebook.getNote(noteId);
+ final List<String> toBeRunParagraphIds = new ArrayList<>();
+ if (note == null) {
+ throw new IOException("Not existed noteId: " + noteId);
}
-
- try {
- Note note = notebookIns.getNote(noteId);
- if (note != null) {
- if (paragraphId != null) {
- Paragraph paragraph = note.getParagraph(paragraphId);
- if (paragraph != null) {
- runner.add(paragraph.getInterpreterContextRunner());
- }
- } else {
- for (Paragraph p : note.getParagraphs()) {
- runner.add(p.getInterpreterContextRunner());
- }
+ if (!paragraphIds.isEmpty() && !paragraphIndices.isEmpty()) {
+ throw new IOException("Can not specify paragraphIds and paragraphIndices together");
+ }
+ if (paragraphIds != null && !paragraphIds.isEmpty()) {
+ for (String paragraphId : paragraphIds) {
+ if (note.getParagraph(paragraphId) == null) {
+ throw new IOException("Not existed paragraphId: " + paragraphId);
+ }
+ if (!paragraphId.equals(curParagraphId)) {
+ toBeRunParagraphIds.add(paragraphId);
}
}
- callback.onFinished(runner);
- } catch (NullPointerException e) {
- LOG.warn(e.getMessage());
- callback.onError();
}
- }
-
- @Override
- public void onRemoteRunParagraph(String noteId, String paragraphId) throws Exception {
- Notebook notebookIns = notebook();
- try {
- if (notebookIns == null) {
- throw new Exception("onRemoteRunParagraph notebook instance is null");
- }
- Note noteIns = notebookIns.getNote(noteId);
- if (noteIns == null) {
- throw new Exception(String.format("Can't found note id %s", noteId));
- }
-
- Paragraph paragraph = noteIns.getParagraph(paragraphId);
- if (paragraph == null) {
- throw new Exception(String.format("Can't found paragraph %s %s", noteId, paragraphId));
+ if (paragraphIndices != null && !paragraphIndices.isEmpty()) {
+ for (int paragraphIndex : paragraphIndices) {
+ if (note.getParagraph(paragraphIndex) == null) {
+ throw new IOException("Not existed paragraphIndex: " + paragraphIndex);
+ }
+ if (!note.getParagraph(paragraphIndex).getId().equals(curParagraphId)) {
+ toBeRunParagraphIds.add(note.getParagraph(paragraphIndex).getId());
+ }
}
-
- Set<String> userAndRoles = Sets.newHashSet();
- userAndRoles.add(SecurityUtils.getPrincipal());
- userAndRoles.addAll(SecurityUtils.getRoles());
- if (!notebookIns.getNotebookAuthorization().hasRunAuthorization(userAndRoles, noteId)) {
- throw new ForbiddenException(String.format("can't execute note %s", noteId));
+ }
+ // run the whole note except the current paragraph
+ if (paragraphIds.isEmpty() && paragraphIndices.isEmpty()) {
+ for (Paragraph paragraph : note.getParagraphs()) {
+ if (!paragraph.getId().equals(curParagraphId)) {
+ toBeRunParagraphIds.add(paragraph.getId());
+ }
}
-
- AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal());
- paragraph.setAuthenticationInfo(subject);
-
- noteIns.run(paragraphId);
- } catch (Exception e) {
- throw e;
}
+ Runnable runThread = new Runnable() {
+ @Override
+ public void run() {
+ for (String paragraphId : toBeRunParagraphIds) {
+ note.run(paragraphId, true);
+ }
+ }
+ };
+ executorService.submit(runThread);
}
+
/**
* Notebook Information Change event.
*/
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties
index 2bf5837..2364998 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -42,3 +42,5 @@ log4j.logger.DataNucleus.Datastore=ERROR
# Log all JDBC parameters
log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.hadoop=WARN
+
+log4j.logger.org.apache.zeppelin.interpreter=DEBUG
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 4b44b6f..5bfd69c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -42,7 +42,6 @@ import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.slf4j.Logger;
@@ -134,13 +133,9 @@ public class InterpreterSetting {
// launcher in future when we have other launcher implementation. e.g. third party launcher
// service like livy
private transient InterpreterLauncher launcher;
-
private transient LifecycleManager lifecycleManager;
- ///////////////////////////////////////////////////////////////////////////////////////////
-
-
-
private transient RecoveryStorage recoveryStorage;
+ private transient RemoteInterpreterEventServer interpreterEventServer;
///////////////////////////////////////////////////////////////////////////////////////////
/**
@@ -219,6 +214,11 @@ public class InterpreterSetting {
return this;
}
+ public Builder setRemoteInterpreterEventServer(RemoteInterpreterEventServer interpreterEventServer) {
+ interpreterSetting.interpreterEventServer = interpreterEventServer;
+ return this;
+ }
+
public Builder setRemoteInterpreterProcessListener(RemoteInterpreterProcessListener
remoteInterpreterProcessListener) {
interpreterSetting.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
@@ -361,6 +361,12 @@ public class InterpreterSetting {
return this;
}
+ public InterpreterSetting setInterpreterEventServer(
+ RemoteInterpreterEventServer interpreterEventServer) {
+ this.interpreterEventServer = interpreterEventServer;
+ return this;
+ }
+
public RecoveryStorage getRecoveryStorage() {
return recoveryStorage;
}
@@ -713,10 +719,8 @@ public class InterpreterSetting {
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(properties, option, interpreterRunner, userName,
- interpreterGroupId, id, group, name);
+ interpreterGroupId, id, group, name, interpreterEventServer.getPort(), interpreterEventServer.getHost());
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
- process.setRemoteInterpreterEventPoller(
- new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
recoveryStorage.onInterpreterClientStart(process);
return process;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 4d86588..d81e490 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -123,6 +123,7 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
private LifecycleManager lifecycleManager;
private RecoveryStorage recoveryStorage;
private ConfigStorage configStorage;
+ private RemoteInterpreterEventServer interpreterEventServer;
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
AngularObjectRegistryListener angularObjectRegistryListener,
@@ -173,7 +174,8 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName());
this.configStorage = configStorage;
-
+ this.interpreterEventServer = new RemoteInterpreterEventServer(conf, this);
+ this.interpreterEventServer.start();
init();
}
@@ -203,6 +205,7 @@ public class InterpreterSettingManager implements InterpreterSettingManagerMBean
.setDependencyResolver(dependencyResolver)
.setLifecycleManager(lifecycleManager)
.setRecoveryStorage(recoveryStorage)
+ .setInterpreterEventServer(interpreterEventServer)
.postProcessing();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index ecbaf16..b6b6b69 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -63,10 +63,6 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
properties);
remoteInterpreterProcess.start(userName);
interpreterSetting.getLifecycleManager().onInterpreterProcessStarted(this);
- remoteInterpreterProcess.getRemoteInterpreterEventPoller()
- .setInterpreterProcess(remoteInterpreterProcess);
- remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this);
- remoteInterpreterProcess.getRemoteInterpreterEventPoller().start();
getInterpreterSetting().getRecoveryStorage()
.onInterpreterClientStart(remoteInterpreterProcess);
}