You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/15 01:10:06 UTC
[11/13] zeppelin git commit: ZEPPELIN-2035. BI directional RPC
framework between ZeppelinServer and InterpreterProcess on top of thrift
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java
deleted file mode 100644
index e1410d6..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/RemoteWorksController.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter;
-
-import java.util.List;
-
-/**
- * zeppelin job for Remote works controller by interpreter
- *
- */
-public interface RemoteWorksController {
- List<InterpreterContextRunner> getRemoteContextRunner(String noteId);
- List<InterpreterContextRunner> getRemoteContextRunner(String noteId, String paragraphId);
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
index 28c40f2..136d866 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLaunchContext.java
@@ -35,6 +35,8 @@ public class InterpreterLaunchContext {
private String interpreterSettingId;
private String interpreterSettingGroup;
private String interpreterSettingName;
+ private int zeppelinServerRPCPort;
+ private String zeppelinServerHost;
public InterpreterLaunchContext(Properties properties,
InterpreterOption option,
@@ -43,7 +45,9 @@ public class InterpreterLaunchContext {
String interpreterGroupId,
String interpreterSettingId,
String interpreterSettingGroup,
- String interpreterSettingName) {
+ String interpreterSettingName,
+ int zeppelinServerRPCPort,
+ String zeppelinServerHost) {
this.properties = properties;
this.option = option;
this.runner = runner;
@@ -52,6 +56,8 @@ public class InterpreterLaunchContext {
this.interpreterSettingId = interpreterSettingId;
this.interpreterSettingGroup = interpreterSettingGroup;
this.interpreterSettingName = interpreterSettingName;
+ this.zeppelinServerRPCPort = zeppelinServerRPCPort;
+ this.zeppelinServerHost = zeppelinServerHost;
}
public Properties getProperties() {
@@ -85,4 +91,12 @@ public class InterpreterLaunchContext {
public String getUserName() {
return userName;
}
+
+ public int getZeppelinServerRPCPort() {
+ return zeppelinServerRPCPort;
+ }
+
+ public String getZeppelinServerHost() {
+ return zeppelinServerHost;
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java
deleted file mode 100644
index 5015a3f..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.zeppelin.interpreter.remote;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- *
- * Wrapper arnd RemoteInterpreterEventClient
- * to expose methods in the client
- *
- */
-public class RemoteEventClient implements RemoteEventClientWrapper {
-
- private RemoteInterpreterEventClient client;
-
- public RemoteEventClient(RemoteInterpreterEventClient client) {
- this.client = client;
- }
-
- @Override
- public void onMetaInfosReceived(Map<String, String> infos) {
- client.onMetaInfosReceived(infos);
- }
-
- @Override
- public void onParaInfosReceived(String noteId, String paragraphId, Map<String, String> infos) {
- Map<String, String> paraInfos = new HashMap<String, String>(infos);
- paraInfos.put("noteId", noteId);
- paraInfos.put("paraId", paragraphId);
- client.onParaInfosReceived(paraInfos);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
deleted file mode 100644
index c0b1251..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.remote;
-
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
-
-/**
- *
- */
-public class RemoteInterpreterContextRunner extends InterpreterContextRunner {
-
- public RemoteInterpreterContextRunner(String noteId, String paragraphId) {
- super(noteId, paragraphId);
- }
-
- @Override
- public void run() {
- // this class should be used only for gson deserialize abstract class
- // code should not reach here
- throw new RuntimeException("Assert");
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
index 9ca8a32..d6de06c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java
@@ -17,14 +17,19 @@
package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
+import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
-import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
-import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
+import org.apache.zeppelin.interpreter.thrift.AppOutputAppendEvent;
+import org.apache.zeppelin.interpreter.thrift.AppOutputUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.AppStatusUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputAppendEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent;
+import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
+import org.apache.zeppelin.interpreter.thrift.RunParagraphsEvent;
import org.apache.zeppelin.resource.RemoteResource;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourceId;
@@ -35,141 +40,67 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
- * Thread connection ZeppelinServer -> RemoteInterpreterServer does not provide
- * remote method invocation from RemoteInterpreterServer -> ZeppelinServer
- *
- * This class provides event send and get response from RemoteInterpreterServer to
- * ZeppelinServer.
- *
- * RemoteInterpreterEventPoller is counter part in ZeppelinServer
+ * This class is used to communicate with ZeppelinServer via thrift.
+ * All the methods are synchronized because thrift client is not thread safe.
*/
-public class RemoteInterpreterEventClient implements ResourcePoolConnector {
- private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventClient.class);
- private final List<RemoteInterpreterEvent> eventQueue = new LinkedList<>();
- private final List<ResourceSet> getAllResourceResponse = new LinkedList<>();
- private final Map<ResourceId, Object> getResourceResponse = new HashMap<>();
- private final Map<InvokeResourceMethodEventMessage, Object> getInvokeResponse = new HashMap<>();
+public class RemoteInterpreterEventClient implements ResourcePoolConnector,
+ AngularObjectRegistryListener {
+ private final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventClient.class);
private final Gson gson = new Gson();
- /**
- * Run paragraph
- * @param runner
- */
- public void getZeppelinServerNoteRunner(
- String eventOwnerKey, ZeppelinServerResourceParagraphRunner runner) {
- RemoteZeppelinServerResource eventBody = new RemoteZeppelinServerResource();
- eventBody.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
- eventBody.setOwnerKey(eventOwnerKey);
- eventBody.setData(runner);
-
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE,
- gson.toJson(eventBody)));
- }
-
- /**
- * Run paragraph
- * @param runner
- */
- public void run(InterpreterContextRunner runner) {
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
- gson.toJson(runner)));
- }
-
- /**
- * notify new angularObject creation
- * @param object
- */
- public void angularObjectAdd(AngularObject object) {
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, object.toJson()));
- }
+ private RemoteInterpreterEventService.Client intpEventServiceClient;
+ private String intpGroupId;
- /**
- * notify angularObject update
- */
- public void angularObjectUpdate(AngularObject object) {
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, object.toJson()));
+ public RemoteInterpreterEventClient(RemoteInterpreterEventService.Client intpEventServiceClient) {
+ this.intpEventServiceClient = intpEventServiceClient;
}
- /**
- * notify angularObject removal
- */
- public void angularObjectRemove(String name, String noteId, String paragraphId) {
- Map<String, String> removeObject = new HashMap<>();
- removeObject.put("name", name);
- removeObject.put("noteId", noteId);
- removeObject.put("paragraphId", paragraphId);
-
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(removeObject)));
+ public void setIntpGroupId(String intpGroupId) {
+ this.intpGroupId = intpGroupId;
}
-
/**
* Get all resources except for specific resourcePool
+ *
* @return
*/
@Override
- public ResourceSet getAllResources() {
- // request
- sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL, null));
-
- synchronized (getAllResourceResponse) {
- while (getAllResourceResponse.isEmpty()) {
- try {
- getAllResourceResponse.wait();
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
+ public synchronized ResourceSet getAllResources() {
+ try {
+ List<String> resources = intpEventServiceClient.getAllResources(intpGroupId);
+ ResourceSet resourceSet = new ResourceSet();
+ for (String res : resources) {
+ RemoteResource resource = RemoteResource.fromJson(res);
+ resource.setResourcePoolConnector(this);
+ resourceSet.add(resource);
}
- ResourceSet resourceSet = getAllResourceResponse.remove(0);
return resourceSet;
+ } catch (TException e) {
+ LOGGER.warn("Fail to getAllResources", e);
+ return null;
}
}
@Override
- public Object readResource(ResourceId resourceId) {
- logger.debug("Request Read Resource {} from ZeppelinServer", resourceId.getName());
- synchronized (getResourceResponse) {
- // wait for previous response consumed
- while (getResourceResponse.containsKey(resourceId)) {
- try {
- getResourceResponse.wait();
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
- }
-
- // send request
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.RESOURCE_GET,
- resourceId.toJson()));
-
- // wait for response
- while (!getResourceResponse.containsKey(resourceId)) {
- try {
- getResourceResponse.wait();
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
- }
- Object o = getResourceResponse.remove(resourceId);
- getResourceResponse.notifyAll();
+ public synchronized Object readResource(ResourceId resourceId) {
+ try {
+ ByteBuffer buffer = intpEventServiceClient.getResource(resourceId.toJson());
+ Object o = Resource.deserializeObject(buffer);
return o;
+ } catch (TException | IOException | ClassNotFoundException e) {
+ LOGGER.warn("Failt to readResource: " + resourceId, e);
+ return null;
}
}
/**
* Invoke method and save result in resourcePool as another resource
+ *
* @param resourceId
* @param methodName
* @param paramTypes
@@ -177,49 +108,51 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
* @return
*/
@Override
- public Object invokeMethod(
+ public synchronized Object invokeMethod(
ResourceId resourceId,
String methodName,
Class[] paramTypes,
Object[] params) {
- logger.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
-
- InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
- resourceId,
- methodName,
- paramTypes,
- params,
- null);
-
- synchronized (getInvokeResponse) {
- // wait for previous response consumed
- while (getInvokeResponse.containsKey(invokeMethod)) {
- try {
- getInvokeResponse.wait();
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
- }
- // send request
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
- invokeMethod.toJson()));
- // wait for response
- while (!getInvokeResponse.containsKey(invokeMethod)) {
- try {
- getInvokeResponse.wait();
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
- }
- Object o = getInvokeResponse.remove(invokeMethod);
- getInvokeResponse.notifyAll();
- return o;
- }
+ LOGGER.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
+
+ return null;
+ // InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
+ // resourceId,
+ // methodName,
+ // paramTypes,
+ // params,
+ // null);
+ //
+ // synchronized (getInvokeResponse) {
+ // // wait for previous response consumed
+ // while (getInvokeResponse.containsKey(invokeMethod)) {
+ // try {
+ // getInvokeResponse.wait();
+ // } catch (InterruptedException e) {
+ // LOGGER.warn(e.getMessage(), e);
+ // }
+ // }
+ // // send request
+ // sendEvent(new RemoteInterpreterEvent(
+ // RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
+ // invokeMethod.toJson()));
+ // // wait for response
+ // while (!getInvokeResponse.containsKey(invokeMethod)) {
+ // try {
+ // getInvokeResponse.wait();
+ // } catch (InterruptedException e) {
+ // LOGGER.warn(e.getMessage(), e);
+ // }
+ // }
+ // Object o = getInvokeResponse.remove(invokeMethod);
+ // getInvokeResponse.notifyAll();
+ // return o;
+ // }
}
/**
* Invoke method and save result in resourcePool as another resource
+ *
* @param resourceId
* @param methodName
* @param paramTypes
@@ -228,267 +161,180 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
* @return
*/
@Override
- public Resource invokeMethod(
+ public synchronized Resource invokeMethod(
ResourceId resourceId,
String methodName,
Class[] paramTypes,
Object[] params,
String returnResourceName) {
- logger.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
-
- InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
- resourceId,
- methodName,
- paramTypes,
- params,
- returnResourceName);
-
- synchronized (getInvokeResponse) {
- // wait for previous response consumed
- while (getInvokeResponse.containsKey(invokeMethod)) {
- try {
- getInvokeResponse.wait();
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
- }
- // send request
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
- invokeMethod.toJson()));
- // wait for response
- while (!getInvokeResponse.containsKey(invokeMethod)) {
- try {
- getInvokeResponse.wait();
- } catch (InterruptedException e) {
- logger.warn(e.getMessage(), e);
- }
- }
- Resource o = (Resource) getInvokeResponse.remove(invokeMethod);
- getInvokeResponse.notifyAll();
- return o;
- }
+ LOGGER.debug("Request Invoke method {} of Resource {}", methodName, resourceId.getName());
+
+ return null;
+ // InvokeResourceMethodEventMessage invokeMethod = new InvokeResourceMethodEventMessage(
+ // resourceId,
+ // methodName,
+ // paramTypes,
+ // params,
+ // returnResourceName);
+ //
+ // synchronized (getInvokeResponse) {
+ // // wait for previous response consumed
+ // while (getInvokeResponse.containsKey(invokeMethod)) {
+ // try {
+ // getInvokeResponse.wait();
+ // } catch (InterruptedException e) {
+ // LOGGER.warn(e.getMessage(), e);
+ // }
+ // }
+ // // send request
+ // sendEvent(new RemoteInterpreterEvent(
+ // RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD,
+ // invokeMethod.toJson()));
+ // // wait for response
+ // while (!getInvokeResponse.containsKey(invokeMethod)) {
+ // try {
+ // getInvokeResponse.wait();
+ // } catch (InterruptedException e) {
+ // LOGGER.warn(e.getMessage(), e);
+ // }
+ // }
+ // Resource o = (Resource) getInvokeResponse.remove(invokeMethod);
+ // getInvokeResponse.notifyAll();
+ // return o;
+ // }
}
- /**
- * Supposed to call from RemoteInterpreterEventPoller
- */
- public void putResponseGetAllResources(List<String> resources) {
- logger.debug("ResourceSet from ZeppelinServer");
- ResourceSet resourceSet = new ResourceSet();
-
- for (String res : resources) {
- RemoteResource resource = RemoteResource.fromJson(res);
- resource.setResourcePoolConnector(this);
- resourceSet.add(resource);
- }
-
- synchronized (getAllResourceResponse) {
- getAllResourceResponse.add(resourceSet);
- getAllResourceResponse.notify();
+ public synchronized void onInterpreterOutputAppend(
+ String noteId, String paragraphId, int outputIndex, String output) {
+ try {
+ intpEventServiceClient.appendOutput(
+ new OutputAppendEvent(noteId, paragraphId, outputIndex, output, null));
+ } catch (TException e) {
+ LOGGER.warn("Fail to appendOutput", e);
}
}
- /**
- * Supposed to call from RemoteInterpreterEventPoller
- * @param resourceId json serialized ResourceId
- * @param object java serialized of the object
- */
- public void putResponseGetResource(String resourceId, ByteBuffer object) {
- ResourceId rid = ResourceId.fromJson(resourceId);
-
- logger.debug("Response resource {} from RemoteInterpreter", rid.getName());
-
- Object o = null;
+ public synchronized void onInterpreterOutputUpdate(
+ String noteId, String paragraphId, int outputIndex,
+ InterpreterResult.Type type, String output) {
try {
- o = Resource.deserializeObject(object);
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- } catch (ClassNotFoundException e) {
- logger.error(e.getMessage(), e);
- }
-
- synchronized (getResourceResponse) {
- getResourceResponse.put(rid, o);
- getResourceResponse.notifyAll();
+ intpEventServiceClient.updateOutput(
+ new OutputUpdateEvent(noteId, paragraphId, outputIndex, type.name(), output, null));
+ } catch (TException e) {
+ LOGGER.warn("Fail to updateOutput", e);
}
}
-
- /**
- * Supposed to call from RemoteInterpreterEventPoller
- * @param invokeMessage json serialized InvokeMessage
- * @param object java serialized of the object
- */
- public void putResponseInvokeMethod(
- InvokeResourceMethodEventMessage invokeMessage, ByteBuffer object) {
- Object o = null;
+ public synchronized void onInterpreterOutputUpdateAll(
+ String noteId, String paragraphId, List<InterpreterResultMessage> messages) {
try {
- o = Resource.deserializeObject(object);
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- } catch (ClassNotFoundException e) {
- logger.error(e.getMessage(), e);
+ intpEventServiceClient.updateAllOutput(
+ new OutputUpdateAllEvent(noteId, paragraphId, convertToThrift(messages)));
+ } catch (TException e) {
+ LOGGER.warn("Fail to updateAllOutput", e);
}
+ }
- synchronized (getInvokeResponse) {
- getInvokeResponse.put(invokeMessage, o);
- getInvokeResponse.notifyAll();
+ private List<org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage>
+ convertToThrift(List<InterpreterResultMessage> messages) {
+ List<org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage> thriftMessages =
+ new ArrayList<>();
+ for (InterpreterResultMessage message : messages) {
+ thriftMessages.add(
+ new org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage(
+ message.getType().name(), message.getData()));
}
+ return thriftMessages;
}
- /**
- * Supposed to call from RemoteInterpreterEventPoller
- * @param invokeMessage invoke message
- * @param resource remote resource
- */
- public void putResponseInvokeMethod(
- InvokeResourceMethodEventMessage invokeMessage, Resource resource) {
- synchronized (getInvokeResponse) {
- getInvokeResponse.put(invokeMessage, resource);
- getInvokeResponse.notifyAll();
+ public synchronized void runParagraphs(String noteId,
+ List<String> paragraphIds,
+ List<Integer> paragraphIndices,
+ String curParagraphId) {
+ RunParagraphsEvent event =
+ new RunParagraphsEvent(noteId, paragraphIds, paragraphIndices, curParagraphId);
+ try {
+ intpEventServiceClient.runParagraphs(event);
+ } catch (TException e) {
+ LOGGER.warn("Fail to runParagraphs: " + event, e);
}
}
- /**
- * Supposed to call from RemoteInterpreterEventPoller
- * @return next available event
- */
- public RemoteInterpreterEvent pollEvent() {
- synchronized (eventQueue) {
- if (eventQueue.isEmpty()) {
- try {
- eventQueue.wait(1000);
- } catch (InterruptedException e) {
- // ignore exception
- }
- }
-
- if (eventQueue.isEmpty()) {
- return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
- } else {
- RemoteInterpreterEvent event = eventQueue.remove(0);
- logger.debug("Send event {}", event.getType());
- return event;
- }
+ public synchronized void onAppOutputAppend(
+ String noteId, String paragraphId, int index, String appId, String output) {
+ AppOutputAppendEvent event =
+ new AppOutputAppendEvent(noteId, paragraphId, appId, index, output);
+ try {
+ intpEventServiceClient.appendAppOutput(event);
+ } catch (TException e) {
+ LOGGER.warn("Fail to appendAppOutput: " + event, e);
}
}
- public void onInterpreterOutputAppend(
- String noteId, String paragraphId, int outputIndex, String output) {
- Map<String, String> appendOutput = new HashMap<>();
- appendOutput.put("noteId", noteId);
- appendOutput.put("paragraphId", paragraphId);
- appendOutput.put("index", Integer.toString(outputIndex));
- appendOutput.put("data", output);
-
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.OUTPUT_APPEND,
- gson.toJson(appendOutput)));
- }
- public void onInterpreterOutputUpdate(
- String noteId, String paragraphId, int outputIndex,
+ public synchronized void onAppOutputUpdate(
+ String noteId, String paragraphId, int index, String appId,
InterpreterResult.Type type, String output) {
- Map<String, String> appendOutput = new HashMap<>();
- appendOutput.put("noteId", noteId);
- appendOutput.put("paragraphId", paragraphId);
- appendOutput.put("index", Integer.toString(outputIndex));
- appendOutput.put("type", type.name());
- appendOutput.put("data", output);
-
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.OUTPUT_UPDATE,
- gson.toJson(appendOutput)));
- }
-
- public void onInterpreterOutputUpdateAll(
- String noteId, String paragraphId, List<InterpreterResultMessage> messages) {
- Map<String, Object> appendOutput = new HashMap<>();
- appendOutput.put("noteId", noteId);
- appendOutput.put("paragraphId", paragraphId);
- appendOutput.put("messages", messages);
-
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.OUTPUT_UPDATE_ALL,
- gson.toJson(appendOutput)));
+ AppOutputUpdateEvent event =
+ new AppOutputUpdateEvent(noteId, paragraphId, appId, index, type.name(), output);
+ try {
+ intpEventServiceClient.updateAppOutput(event);
+ } catch (TException e) {
+ LOGGER.warn("Fail to updateAppOutput: " + event, e);
+ }
}
- private void sendEvent(RemoteInterpreterEvent event) {
- logger.debug("Send Event: " + event);
- synchronized (eventQueue) {
- eventQueue.add(event);
- eventQueue.notifyAll();
+ public synchronized void onAppStatusUpdate(String noteId, String paragraphId, String appId,
+ String status) {
+ AppStatusUpdateEvent event = new AppStatusUpdateEvent(noteId, paragraphId, appId, status);
+ try {
+ intpEventServiceClient.updateAppStatus(event);
+ } catch (TException e) {
+ LOGGER.warn("Fail to updateAppStatus: " + event, e);
}
}
- public void onAppOutputAppend(
- String noteId, String paragraphId, int index, String appId, String output) {
- Map<String, Object> appendOutput = new HashMap<>();
- appendOutput.put("noteId", noteId);
- appendOutput.put("paragraphId", paragraphId);
- appendOutput.put("index", Integer.toString(index));
- appendOutput.put("appId", appId);
- appendOutput.put("data", output);
-
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.OUTPUT_APPEND,
- gson.toJson(appendOutput)));
+ public synchronized void onMetaInfosReceived(Map<String, String> infos) {
+ try {
+ intpEventServiceClient.sendMetaInfo(intpGroupId, gson.toJson(infos));
+ } catch (TException e) {
+ LOGGER.warn("Fail to sendMetaInfo: " + infos, e);
+ }
}
-
- public void onAppOutputUpdate(
- String noteId, String paragraphId, int index, String appId,
- InterpreterResult.Type type, String output) {
- Map<String, Object> appendOutput = new HashMap<>();
- appendOutput.put("noteId", noteId);
- appendOutput.put("paragraphId", paragraphId);
- appendOutput.put("index", Integer.toString(index));
- appendOutput.put("appId", appId);
- appendOutput.put("type", type);
- appendOutput.put("data", output);
- logger.debug("onAppoutputUpdate = {}", output);
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.OUTPUT_UPDATE,
- gson.toJson(appendOutput)));
+ public synchronized void onParaInfosReceived(Map<String, String> infos) {
+ try {
+ intpEventServiceClient.sendParagraphInfo(intpGroupId, gson.toJson(infos));
+ } catch (TException e) {
+ LOGGER.warn("Fail to onParaInfosReceived: " + infos, e);
+ }
}
- public void onAppStatusUpdate(String noteId, String paragraphId, String appId, String status) {
- Map<String, String> appendOutput = new HashMap<>();
- appendOutput.put("noteId", noteId);
- appendOutput.put("paragraphId", paragraphId);
- appendOutput.put("appId", appId);
- appendOutput.put("status", status);
-
- sendEvent(new RemoteInterpreterEvent(
- RemoteInterpreterEventType.APP_STATUS_UPDATE,
- gson.toJson(appendOutput)));
+ @Override
+ public synchronized void onAdd(String interpreterGroupId, AngularObject object) {
+ try {
+ intpEventServiceClient.addAngularObject(intpGroupId, object.toJson());
+ } catch (TException e) {
+ LOGGER.warn("Fail to add AngularObject: " + object, e);
+ }
}
- public void onMetaInfosReceived(Map<String, String> infos) {
- sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.META_INFOS,
- gson.toJson(infos)));
+ @Override
+ public synchronized void onUpdate(String interpreterGroupId, AngularObject object) {
+ try {
+ intpEventServiceClient.updateAngularObject(intpGroupId, object.toJson());
+ } catch (TException e) {
+ LOGGER.warn("Fail to update AngularObject: " + object, e);
+ }
}
- public void onParaInfosReceived(Map<String, String> infos) {
- sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.PARA_INFOS,
- gson.toJson(infos)));
- }
- /**
- * Wait for eventQueue becomes empty
- */
- public void waitForEventQueueBecomesEmpty(long atMost) {
- long startTime = System.currentTimeMillis();
- synchronized (eventQueue) {
- while (!eventQueue.isEmpty() && (System.currentTimeMillis() - startTime) < atMost) {
- try {
- eventQueue.wait(100);
- } catch (InterruptedException e) {
- // ignore exception
- }
- }
- if (!eventQueue.isEmpty())
- eventQueue.clear();
+ @Override
+ public synchronized void onRemove(String interpreterGroupId, String name, String noteId,
+ String paragraphId) {
+ try {
+ intpEventServiceClient.removeAngularObject(intpGroupId, noteId, paragraphId, name);
+ } catch (TException e) {
+ LOGGER.warn("Fail to remove AngularObject", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index e4db469..a8ca10f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -20,13 +20,16 @@ package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.Application;
import org.apache.zeppelin.helium.ApplicationContext;
@@ -50,17 +53,15 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
-import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
-import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RegisterInfo;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
-import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.DistributedResourcePool;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
@@ -97,33 +98,31 @@ import java.util.concurrent.ConcurrentMap;
* Accepting thrift connections from ZeppelinServer.
*/
public class RemoteInterpreterServer extends Thread
- implements RemoteInterpreterService.Iface, AngularObjectRegistryListener {
+ implements RemoteInterpreterService.Iface {
private static Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class);
- InterpreterGroup interpreterGroup;
- AngularObjectRegistry angularObjectRegistry;
- InterpreterHookRegistry hookRegistry;
- DistributedResourcePool resourcePool;
+ private String interpreterGroupId;
+ private InterpreterGroup interpreterGroup;
+ private AngularObjectRegistry angularObjectRegistry;
+ private InterpreterHookRegistry hookRegistry;
+ private DistributedResourcePool resourcePool;
private ApplicationLoader appLoader;
+ private Gson gson = new Gson();
- Gson gson = new Gson();
-
- RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
- private String callbackHost;
- private int callbackPort;
+ private String intpEventServerHost;
private String host;
private int port;
private TThreadPoolServer server;
+ RemoteInterpreterEventService.Client intpEventServiceClient;
- RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient();
+ RemoteInterpreterEventClient intpEventClient;
private DependencyResolver depLoader;
private final Map<String, RunningApplication> runningApplications =
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
private Map<String, Object> remoteWorksResponsePool;
- private ZeppelinRemoteWorksController remoteWorksController;
private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;
@@ -132,27 +131,41 @@ public class RemoteInterpreterServer extends Thread
private boolean isTest;
- public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange)
+ public RemoteInterpreterServer(String intpEventServerHost,
+ int intpEventServerPort,
+ String interpreterGroupId,
+ String portRange)
throws IOException, TTransportException {
- this(callbackHost, callbackPort, portRange, false);
+ this(intpEventServerHost, intpEventServerPort, portRange, interpreterGroupId, false);
}
- public RemoteInterpreterServer(String callbackHost, int callbackPort, String portRange,
- boolean isTest) throws TTransportException, IOException {
- if (null != callbackHost) {
- this.callbackHost = callbackHost;
- this.callbackPort = callbackPort;
+ public RemoteInterpreterServer(String intpEventServerHost,
+ int intpEventServerPort,
+ String portRange,
+ String interpreterGroupId,
+ boolean isTest)
+ throws TTransportException, IOException {
+ if (null != intpEventServerHost) {
+ this.intpEventServerHost = intpEventServerHost;
+ if (!isTest) {
+ TTransport transport = new TSocket(intpEventServerHost, intpEventServerPort);
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(transport);
+ intpEventServiceClient = new RemoteInterpreterEventService.Client(protocol);
+ intpEventClient = new RemoteInterpreterEventClient(intpEventServiceClient);
+ }
} else {
// DevInterpreter
- this.port = callbackPort;
+ this.port = intpEventServerPort;
}
this.isTest = isTest;
-
- processor = new RemoteInterpreterService.Processor<>(this);
+ this.interpreterGroupId = interpreterGroupId;
+ RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
+ new RemoteInterpreterService.Processor<>(this);
TServerSocket serverTransport;
- if (null == callbackHost) {
+ if (null == intpEventServerHost) {
// Dev Interpreter
- serverTransport = new TServerSocket(callbackPort);
+ serverTransport = new TServerSocket(intpEventServerPort);
} else {
serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
this.port = serverTransport.getServerSocket().getLocalPort();
@@ -163,12 +176,11 @@ public class RemoteInterpreterServer extends Thread
new TThreadPoolServer.Args(serverTransport).processor(processor));
logger.info("Starting remote interpreter server on port {}", port);
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
- remoteWorksController = new ZeppelinRemoteWorksController(this, remoteWorksResponsePool);
}
@Override
public void run() {
- if (null != callbackHost && !isTest) {
+ if (null != intpEventServerHost && !isTest) {
new Thread(new Runnable() {
boolean interrupted = false;
@@ -183,12 +195,11 @@ public class RemoteInterpreterServer extends Thread
}
if (!interrupted) {
- CallbackInfo callbackInfo = new CallbackInfo(host, port);
+ RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
try {
- RemoteInterpreterUtils
- .registerInterpreter(callbackHost, callbackPort, callbackInfo);
+ intpEventServiceClient.registerInterpreterProcess(registerInfo);
} catch (TException e) {
- logger.error("Error while registering interpreter: {}", callbackInfo, e);
+ logger.error("Error while registering interpreter: {}", registerInfo, e);
try {
shutdown();
} catch (TException e1) {
@@ -205,7 +216,6 @@ public class RemoteInterpreterServer extends Thread
@Override
public void shutdown() throws TException {
logger.info("Shutting down...");
- eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT);
if (interpreterGroup != null) {
for (List<Interpreter> session : interpreterGroup.values()) {
for (Interpreter interpreter : session) {
@@ -254,21 +264,20 @@ public class RemoteInterpreterServer extends Thread
public static void main(String[] args)
throws TTransportException, InterruptedException, IOException {
- Class klass = RemoteInterpreterServer.class;
- URL location = klass.getResource('/' + klass.getName().replace('.', '/') + ".class");
- logger.info("URL:" + location);
- String callbackHost = null;
+ String zeppelinServerHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
String portRange = ":";
+ String interpreterGroupId = null;
if (args.length > 0) {
- callbackHost = args[0];
+ zeppelinServerHost = args[0];
port = Integer.parseInt(args[1]);
- if (args.length > 2) {
- portRange = args[2];
+ interpreterGroupId = args[2];
+ if (args.length > 3) {
+ portRange = args[3];
}
}
RemoteInterpreterServer remoteInterpreterServer =
- new RemoteInterpreterServer(callbackHost, port, portRange);
+ new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);
remoteInterpreterServer.start();
remoteInterpreterServer.join();
System.exit(0);
@@ -279,12 +288,13 @@ public class RemoteInterpreterServer extends Thread
className, Map<String, String> properties, String userName) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
- angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
+ angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
hookRegistry = new InterpreterHookRegistry();
- resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
+ resourcePool = new DistributedResourcePool(interpreterGroup.getId(), intpEventClient);
interpreterGroup.setInterpreterHookRegistry(hookRegistry);
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
interpreterGroup.setResourcePool(resourcePool);
+ intpEventClient.setIntpGroupId(interpreterGroupId);
String localRepoPath = properties.get("zeppelin.interpreter.localRepo");
if (properties.containsKey("zeppelin.interpreter.output.limit")) {
@@ -327,8 +337,8 @@ public class RemoteInterpreterServer extends Thread
return resourcePool;
}
- protected RemoteInterpreterEventClient getEventClient() {
- return eventClient;
+ protected RemoteInterpreterEventClient getIntpEventClient() {
+ return intpEventClient;
}
private void setSystemProperty(Properties properties) {
@@ -388,7 +398,7 @@ public class RemoteInterpreterServer extends Thread
logger.info("Unload App {} ", appInfo.pkg.getName());
appInfo.app.unload();
// see ApplicationState.Status.UNLOADED
- eventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");
+ intpEventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED");
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
}
@@ -739,32 +749,48 @@ public class RemoteInterpreterServer extends Thread
}
private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutput output) {
- List<InterpreterContextRunner> contextRunners = new LinkedList<>();
- List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
- new TypeToken<List<RemoteInterpreterContextRunner>>() {
- }.getType());
-
- if (runners != null) {
- for (InterpreterContextRunner r : runners) {
- contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
- }
- }
-
- return new InterpreterContext(
- ric.getNoteId(),
- ric.getParagraphId(),
- ric.getReplName(),
- ric.getParagraphTitle(),
- ric.getParagraphText(),
- AuthenticationInfo.fromJson(ric.getAuthenticationInfo()),
- (Map<String, Object>) gson.fromJson(ric.getConfig(),
- new TypeToken<Map<String, Object>>() {
- }.getType()),
- GUI.fromJson(ric.getGui()),
- GUI.fromJson(ric.getNoteGui()),
- interpreterGroup.getAngularObjectRegistry(),
- interpreterGroup.getResourcePool(),
- contextRunners, output, remoteWorksController, eventClient, progressMap);
+ // List<InterpreterContextRunner> contextRunners = new LinkedList<>();
+ // List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
+ // new TypeToken<List<RemoteInterpreterContextRunner>>() {
+ // }.getType());
+ //
+ // if (runners != null) {
+ // for (InterpreterContextRunner r : runners) {
+ // contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
+ // }
+ // }
+
+ return InterpreterContext.builder()
+ .setNoteId(ric.getNoteId())
+ .setParagraphId(ric.getParagraphId())
+ .setReplName(ric.getReplName())
+ .setParagraphTitle(ric.getParagraphTitle())
+ .setParagraphText(ric.getParagraphText())
+ .setAuthenticationInfo(AuthenticationInfo.fromJson(ric.getAuthenticationInfo()))
+ .setGUI(GUI.fromJson(ric.getGui()))
+ .setNoteGUI(GUI.fromJson(ric.getNoteGui()))
+ .setAngularObjectRegistry(interpreterGroup.getAngularObjectRegistry())
+ .setResourcePool(interpreterGroup.getResourcePool())
+ .setInterpreterOut(output)
+ .setIntpEventClient(intpEventClient)
+ .setProgressMap(progressMap)
+ .build();
+ // return new InterpreterContext(
+ // ric.getNoteId(),
+ // ric.getParagraphId(),
+ // ric.getReplName(),
+ // ric.getParagraphTitle(),
+ // ric.getParagraphText(),
+ // AuthenticationInfo.fromJson(ric.getAuthenticationInfo()),
+ // (Map<String, Object>) gson.fromJson(ric.getConfig(),
+ // new TypeToken<Map<String, Object>>() {
+ // }.getType()),
+ // GUI.fromJson(ric.getGui()),
+ // GUI.fromJson(ric.getNoteGui()),
+ // interpreterGroup.getAngularObjectRegistry(),
+ // interpreterGroup.getResourcePool(),
+ // output, intpEventClient,
+ // progressMap);
}
@@ -774,7 +800,7 @@ public class RemoteInterpreterServer extends Thread
@Override
public void onUpdateAll(InterpreterOutput out) {
try {
- eventClient.onInterpreterOutputUpdateAll(
+ intpEventClient.onInterpreterOutputUpdateAll(
noteId, paragraphId, out.toInterpreterResultMessage());
} catch (IOException e) {
logger.error(e.getMessage(), e);
@@ -785,7 +811,7 @@ public class RemoteInterpreterServer extends Thread
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
String output = new String(line);
logger.debug("Output Append: {}", output);
- eventClient.onInterpreterOutputAppend(
+ intpEventClient.onInterpreterOutputAppend(
noteId, paragraphId, index, output);
}
@@ -795,7 +821,7 @@ public class RemoteInterpreterServer extends Thread
try {
output = new String(out.toByteArray());
logger.debug("Output Update for index {}: {}", index, output);
- eventClient.onInterpreterOutputUpdate(
+ intpEventClient.onInterpreterOutputUpdate(
noteId, paragraphId, index, out.getType(), output);
} catch (IOException e) {
logger.error(e.getMessage(), e);
@@ -816,83 +842,10 @@ public class RemoteInterpreterServer extends Thread
@Override
public void run() {
- server.eventClient.run(this);
+// server.eventClient.run(this);
}
}
- static class ZeppelinRemoteWorksController implements RemoteWorksController {
- Logger logger = LoggerFactory.getLogger(ZeppelinRemoteWorksController.class);
-
- private final long DEFAULT_TIMEOUT_VALUE = 300000;
- private final Map<String, Object> remoteWorksResponsePool;
- private RemoteInterpreterServer server;
-
- ZeppelinRemoteWorksController(
- RemoteInterpreterServer server, Map<String, Object> remoteWorksResponsePool) {
- this.remoteWorksResponsePool = remoteWorksResponsePool;
- this.server = server;
- }
-
- public String generateOwnerKey() {
- String hashKeyText = new String("ownerKey" + System.currentTimeMillis());
- String hashKey = String.valueOf(hashKeyText.hashCode());
- return hashKey;
- }
-
- public boolean waitForEvent(String eventOwnerKey) throws InterruptedException {
- return waitForEvent(eventOwnerKey, DEFAULT_TIMEOUT_VALUE);
- }
-
- public boolean waitForEvent(String eventOwnerKey, long timeout) throws InterruptedException {
- boolean wasGetData = false;
- long now = System.currentTimeMillis();
- long endTime = System.currentTimeMillis() + timeout;
-
- while (endTime >= now) {
- synchronized (this.remoteWorksResponsePool) {
- wasGetData = this.remoteWorksResponsePool.containsKey(eventOwnerKey);
- }
- if (wasGetData == true) {
- break;
- }
- now = System.currentTimeMillis();
- sleep(500);
- }
-
- return wasGetData;
- }
-
- @Override
- public List<InterpreterContextRunner> getRemoteContextRunner(String noteId) {
- return getRemoteContextRunner(noteId, null);
- }
-
- public List<InterpreterContextRunner> getRemoteContextRunner(
- String noteId, String paragraphID) {
-
- List<InterpreterContextRunner> runners = null;
- String ownerKey = generateOwnerKey();
-
- ZeppelinServerResourceParagraphRunner resource = new ZeppelinServerResourceParagraphRunner();
- resource.setNoteId(noteId);
- resource.setParagraphId(paragraphID);
- server.eventClient.getZeppelinServerNoteRunner(ownerKey, resource);
-
- try {
- this.waitForEvent(ownerKey);
- } catch (Exception e) {
- return new LinkedList<>();
- }
- synchronized (this.remoteWorksResponsePool) {
- runners = (List<InterpreterContextRunner>) this.remoteWorksResponsePool.get(ownerKey);
- this.remoteWorksResponsePool.remove(ownerKey);
- }
- return runners;
- }
-
-
- }
-
private RemoteInterpreterResult convert(InterpreterResult result,
Map<String, Object> config, GUI gui, GUI noteGui) {
@@ -921,54 +874,30 @@ public class RemoteInterpreterServer extends Thread
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(sessionId);
if (interpreters == null) {
+ logger.info("getStatus:" + Status.UNKNOWN.name());
return Status.UNKNOWN.name();
}
//TODO(zjffdu) ineffient for loop interpreter and its jobs
for (Interpreter intp : interpreters) {
for (Job job : intp.getScheduler().getJobsRunning()) {
if (jobId.equals(job.getId())) {
+ logger.info("getStatus:" + job.getStatus().name());
return job.getStatus().name();
}
}
for (Job job : intp.getScheduler().getJobsWaiting()) {
if (jobId.equals(job.getId())) {
+ logger.info("getStatus:" + job.getStatus().name());
return job.getStatus().name();
}
}
}
}
+ logger.info("getStatus:" + Status.UNKNOWN.name());
return Status.UNKNOWN.name();
}
-
- @Override
- public void onAdd(String interpreterGroupId, AngularObject object) {
- eventClient.angularObjectAdd(object);
- }
-
- @Override
- public void onUpdate(String interpreterGroupId, AngularObject object) {
- eventClient.angularObjectUpdate(object);
- }
-
- @Override
- public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) {
- eventClient.angularObjectRemove(name, noteId, paragraphId);
- }
-
-
- /**
- * Poll event from RemoteInterpreterEventPoller
- *
- * @return
- * @throws TException
- */
- @Override
- public RemoteInterpreterEvent getEvent() throws TException {
- return eventClient.pollEvent();
- }
-
/**
* called when object is updated in client (web) side.
*
@@ -1069,25 +998,8 @@ public class RemoteInterpreterServer extends Thread
}
@Override
- public void resourcePoolResponseGetAll(List<String> resources) throws TException {
- eventClient.putResponseGetAllResources(resources);
- }
-
- /**
- * Get payload of resource from remote
- *
- * @param resourceId json serialized ResourceId
- * @param object java serialized of the object
- * @throws TException
- */
- @Override
- public void resourceResponseGet(String resourceId, ByteBuffer object) throws TException {
- eventClient.putResponseGetResource(resourceId, object);
- }
-
- @Override
public List<String> resourcePoolGetAll() throws TException {
- logger.debug("Request getAll from ZeppelinServer");
+ logger.debug("Request resourcePoolGetAll from ZeppelinServer");
List<String> result = new LinkedList<>();
if (resourcePool == null) {
@@ -1170,30 +1082,30 @@ public class RemoteInterpreterServer extends Thread
}
}
- /**
- * Get payload of resource from remote
- *
- * @param invokeResourceMethodEventMessage json serialized InvokeResourcemethodEventMessage
- * @param object java serialized of the object
- * @throws TException
- */
- @Override
- public void resourceResponseInvokeMethod(
- String invokeResourceMethodEventMessage, ByteBuffer object) throws TException {
- InvokeResourceMethodEventMessage message =
- InvokeResourceMethodEventMessage.fromJson(invokeResourceMethodEventMessage);
-
- if (message.shouldPutResultIntoResourcePool()) {
- Resource resource = resourcePool.get(
- message.resourceId.getNoteId(),
- message.resourceId.getParagraphId(),
- message.returnResourceName,
- true);
- eventClient.putResponseInvokeMethod(message, resource);
- } else {
- eventClient.putResponseInvokeMethod(message, object);
- }
- }
+ // /**
+ // * Get payload of resource from remote
+ // *
+ // * @param invokeResourceMethodEventMessage json serialized InvokeResourcemethodEventMessage
+ // * @param object java serialized of the object
+ // * @throws TException
+ // */
+ // @Override
+ // public void resourceResponseInvokeMethod(
+ // String invokeResourceMethodEventMessage, ByteBuffer object) throws TException {
+ // InvokeResourceMethodEventMessage message =
+ // InvokeResourceMethodEventMessage.fromJson(invokeResourceMethodEventMessage);
+ //
+ // if (message.shouldPutResultIntoResourcePool()) {
+ // Resource resource = resourcePool.get(
+ // message.resourceId.getNoteId(),
+ // message.resourceId.getParagraphId(),
+ // message.returnResourceName,
+ // true);
+ // eventClient.putResponseInvokeMethod(message, resource);
+ // } else {
+ // eventClient.putResponseInvokeMethod(message, object);
+ // }
+ // }
@Override
public void angularRegistryPush(String registryAsString) throws TException {
@@ -1219,13 +1131,13 @@ public class RemoteInterpreterServer extends Thread
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
- eventClient.onAppOutputAppend(noteId, paragraphId, index, appId, new String(line));
+ intpEventClient.onAppOutputAppend(noteId, paragraphId, index, appId, new String(line));
}
@Override
public void onUpdate(int index, InterpreterResultMessageOutput out) {
try {
- eventClient.onAppOutputUpdate(noteId, paragraphId, index, appId,
+ intpEventClient.onAppOutputUpdate(noteId, paragraphId, index, appId,
out.getType(), new String(out.toByteArray()));
} catch (IOException e) {
logger.error(e.getMessage(), e);
@@ -1318,7 +1230,7 @@ public class RemoteInterpreterServer extends Thread
runningApp.app.run(resource);
context.out.flush();
InterpreterResultMessageOutput out = context.out.getOutputAt(0);
- eventClient.onAppOutputUpdate(
+ intpEventClient.onAppOutputUpdate(
context.getNoteId(),
context.getParagraphId(),
0,
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 223588f..cf82247 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,6 +17,12 @@
package org.apache.zeppelin.interpreter.remote;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.net.ConnectException;
import java.net.Inet4Address;
@@ -30,20 +36,6 @@ import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Collections;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
-import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
-
/**
*
*/
@@ -130,14 +122,14 @@ public class RemoteInterpreterUtils {
// end point is not accessible
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
- "(might be initializing): " + cne.getMessage());
+ "(might be initializing): " + cne.getMessage());
}
return false;
} catch (IOException ioe) {
// end point is not accessible
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
- "(might be initializing): " + ioe.getMessage());
+ "(might be initializing): " + ioe.getMessage());
}
return false;
}
@@ -160,16 +152,4 @@ public class RemoteInterpreterUtils {
return key.matches("^[A-Z_0-9]*");
}
- public static void registerInterpreter(String callbackHost, int callbackPort,
- final CallbackInfo callbackInfo) throws TException {
- LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", callbackHost, callbackPort,
- callbackInfo);
- try (TTransport transport = new TSocket(callbackHost, callbackPort)) {
- transport.open();
- TProtocol protocol = new TBinaryProtocol(transport);
- RemoteInterpreterCallbackService.Client client = new RemoteInterpreterCallbackService.Client(
- protocol);
- client.callback(callbackInfo);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/7af86168/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
new file mode 100644
index 0000000..ccf9316
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2018-5-29")
+public class AngularObjectId implements org.apache.thrift.TBase<AngularObjectId, AngularObjectId._Fields>, java.io.Serializable, Cloneable, Comparable<AngularObjectId> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId");
+
+ private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("name", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new AngularObjectIdStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new AngularObjectIdTupleSchemeFactory());
+ }
+
+ public String noteId; // required
+ public String paragraphId; // required
+ public String name; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ NOTE_ID((short)1, "noteId"),
+ PARAGRAPH_ID((short)2, "paragraphId"),
+ NAME((short)3, "name");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // NOTE_ID
+ return NOTE_ID;
+ case 2: // PARAGRAPH_ID
+ return PARAGRAPH_ID;
+ case 3: // NAME
+ return NAME;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.NAME, new org.apache.thrift.meta_data.FieldMetaData("name", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AngularObjectId.class, metaDataMap);
+ }
+
+ public AngularObjectId() {
+ }
+
+ public AngularObjectId(
+ String noteId,
+ String paragraphId,
+ String name)
+ {
+ this();
+ this.noteId = noteId;
+ this.paragraphId = paragraphId;
+ this.name = name;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public AngularObjectId(AngularObjectId other) {
+ if (other.isSetNoteId()) {
+ this.noteId = other.noteId;
+ }
+ if (other.isSetParagraphId()) {
+ this.paragraphId = other.paragraphId;
+ }
+ if (other.isSetName()) {
+ this.name = other.name;
+ }
+ }
+
+ public AngularObjectId deepCopy() {
+ return new AngularObjectId(this);
+ }
+
+ @Override
+ public void clear() {
+ this.noteId = null;
+ this.paragraphId = null;
+ this.name = null;
+ }
+
+ public String getNoteId() {
+ return this.noteId;
+ }
+
+ public AngularObjectId setNoteId(String noteId) {
+ this.noteId = noteId;
+ return this;
+ }
+
+ public void unsetNoteId() {
+ this.noteId = null;
+ }
+
+ /** Returns true if field noteId is set (has been assigned a value) and false otherwise */
+ public boolean isSetNoteId() {
+ return this.noteId != null;
+ }
+
+ public void setNoteIdIsSet(boolean value) {
+ if (!value) {
+ this.noteId = null;
+ }
+ }
+
+ public String getParagraphId() {
+ return this.paragraphId;
+ }
+
+ public AngularObjectId setParagraphId(String paragraphId) {
+ this.paragraphId = paragraphId;
+ return this;
+ }
+
+ public void unsetParagraphId() {
+ this.paragraphId = null;
+ }
+
+ /** Returns true if field paragraphId is set (has been assigned a value) and false otherwise */
+ public boolean isSetParagraphId() {
+ return this.paragraphId != null;
+ }
+
+ public void setParagraphIdIsSet(boolean value) {
+ if (!value) {
+ this.paragraphId = null;
+ }
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public AngularObjectId setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public void unsetName() {
+ this.name = null;
+ }
+
+ /** Returns true if field name is set (has been assigned a value) and false otherwise */
+ public boolean isSetName() {
+ return this.name != null;
+ }
+
+ public void setNameIsSet(boolean value) {
+ if (!value) {
+ this.name = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case NOTE_ID:
+ if (value == null) {
+ unsetNoteId();
+ } else {
+ setNoteId((String)value);
+ }
+ break;
+
+ case PARAGRAPH_ID:
+ if (value == null) {
+ unsetParagraphId();
+ } else {
+ setParagraphId((String)value);
+ }
+ break;
+
+ case NAME:
+ if (value == null) {
+ unsetName();
+ } else {
+ setName((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case NOTE_ID:
+ return getNoteId();
+
+ case PARAGRAPH_ID:
+ return getParagraphId();
+
+ case NAME:
+ return getName();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case NOTE_ID:
+ return isSetNoteId();
+ case PARAGRAPH_ID:
+ return isSetParagraphId();
+ case NAME:
+ return isSetName();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof AngularObjectId)
+ return this.equals((AngularObjectId)that);
+ return false;
+ }
+
+ public boolean equals(AngularObjectId that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_noteId = true && this.isSetNoteId();
+ boolean that_present_noteId = true && that.isSetNoteId();
+ if (this_present_noteId || that_present_noteId) {
+ if (!(this_present_noteId && that_present_noteId))
+ return false;
+ if (!this.noteId.equals(that.noteId))
+ return false;
+ }
+
+ boolean this_present_paragraphId = true && this.isSetParagraphId();
+ boolean that_present_paragraphId = true && that.isSetParagraphId();
+ if (this_present_paragraphId || that_present_paragraphId) {
+ if (!(this_present_paragraphId && that_present_paragraphId))
+ return false;
+ if (!this.paragraphId.equals(that.paragraphId))
+ return false;
+ }
+
+ boolean this_present_name = true && this.isSetName();
+ boolean that_present_name = true && that.isSetName();
+ if (this_present_name || that_present_name) {
+ if (!(this_present_name && that_present_name))
+ return false;
+ if (!this.name.equals(that.name))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_noteId = true && (isSetNoteId());
+ list.add(present_noteId);
+ if (present_noteId)
+ list.add(noteId);
+
+ boolean present_paragraphId = true && (isSetParagraphId());
+ list.add(present_paragraphId);
+ if (present_paragraphId)
+ list.add(paragraphId);
+
+ boolean present_name = true && (isSetName());
+ list.add(present_name);
+ if (present_name)
+ list.add(name);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(AngularObjectId other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetNoteId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetParagraphId()).compareTo(other.isSetParagraphId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetParagraphId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphId, other.paragraphId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetName()).compareTo(other.isSetName());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetName()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.name, other.name);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("AngularObjectId(");
+ boolean first = true;
+
+ sb.append("noteId:");
+ if (this.noteId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.noteId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("paragraphId:");
+ if (this.paragraphId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.paragraphId);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("name:");
+ if (this.name == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.name);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class AngularObjectIdStandardSchemeFactory implements SchemeFactory {
+ public AngularObjectIdStandardScheme getScheme() {
+ return new AngularObjectIdStandardScheme();
+ }
+ }
+
+ private static class AngularObjectIdStandardScheme extends StandardScheme<AngularObjectId> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, AngularObjectId struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // NOTE_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.noteId = iprot.readString();
+ struct.setNoteIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // PARAGRAPH_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.paragraphId = iprot.readString();
+ struct.setParagraphIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // NAME
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.name = iprot.readString();
+ struct.setNameIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, AngularObjectId struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.noteId != null) {
+ oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
+ oprot.writeString(struct.noteId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.paragraphId != null) {
+ oprot.writeFieldBegin(PARAGRAPH_ID_FIELD_DESC);
+ oprot.writeString(struct.paragraphId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.name != null) {
+ oprot.writeFieldBegin(NAME_FIELD_DESC);
+ oprot.writeString(struct.name);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class AngularObjectIdTupleSchemeFactory implements SchemeFactory {
+ public AngularObjectIdTupleScheme getScheme() {
+ return new AngularObjectIdTupleScheme();
+ }
+ }
+
+ private static class AngularObjectIdTupleScheme extends TupleScheme<AngularObjectId> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, AngularObjectId struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetNoteId()) {
+ optionals.set(0);
+ }
+ if (struct.isSetParagraphId()) {
+ optionals.set(1);
+ }
+ if (struct.isSetName()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
+ if (struct.isSetNoteId()) {
+ oprot.writeString(struct.noteId);
+ }
+ if (struct.isSetParagraphId()) {
+ oprot.writeString(struct.paragraphId);
+ }
+ if (struct.isSetName()) {
+ oprot.writeString(struct.name);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, AngularObjectId struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(3);
+ if (incoming.get(0)) {
+ struct.noteId = iprot.readString();
+ struct.setNoteIdIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.paragraphId = iprot.readString();
+ struct.setParagraphIdIsSet(true);
+ }
+ if (incoming.get(2)) {
+ struct.name = iprot.readString();
+ struct.setNameIsSet(true);
+ }
+ }
+ }
+
+}
+