You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/06 22:30:01 UTC
zeppelin git commit: ZEPPELIN-2645. Adding way to register
RemoteInterpreterServer's port into InterpreterProcess
Repository: zeppelin
Updated Branches:
refs/heads/master b14f5ab95 -> 1812928bf
ZEPPELIN-2645. Adding way to register RemoteInterpreterServer's port into InterpreterProcess
### What is this PR for?
Rebase PR #2418 , still use thrift as the communication protocol between zeppelin server and interpreter process. We can change it io netty in future when we implement 2 way communication channel between zeppelin server and interpreter process.
### What type of PR is it?
[Bug Fix | Improvement | Feature | Documentation | Hot Fix | Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-2645
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? NO
* Does this needs documentation? No
Author: Jongyoul Lee <jo...@gmail.com>
Closes #2562 from zjffdu/ZEPPELIN-2645 and squashes the following commits:
82bd8d0 [Jongyoul Lee] ZEPPELIN-2645. Adding way to register RemoteInterpreterServer's port into InterpreterProcess
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1812928b
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1812928b
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1812928b
Branch: refs/heads/master
Commit: 1812928bfbecb0cb685fe3233e65ef9d8c84f73f
Parents: b14f5ab
Author: Jongyoul Lee <jo...@gmail.com>
Authored: Wed Jun 14 19:43:49 2017 +0900
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Sep 7 06:29:55 2017 +0800
----------------------------------------------------------------------
bin/interpreter.sh | 13 +-
conf/zeppelin-site.xml.template | 6 +
.../zeppelin/helium/ZeppelinDevServer.java | 4 +-
.../remote/RemoteInterpreterServer.java | 148 +++-
.../remote/RemoteInterpreterUtils.java | 98 ++-
.../interpreter/thrift/CallbackInfo.java | 518 +++++++++++
.../RemoteInterpreterCallbackService.java | 879 +++++++++++++++++++
.../main/thrift/RemoteInterpreterService.thrift | 9 +
.../remote/RemoteInterpreterServerTest.java | 6 +-
.../remote/RemoteInterpreterUtilsTest.java | 11 +
.../zeppelin/conf/ZeppelinConfiguration.java | 8 +-
.../interpreter/InterpreterSetting.java | 3 +-
.../remote/RemoteInterpreterManagedProcess.java | 129 ++-
.../apache/zeppelin/notebook/NotebookTest.java | 6 +-
14 files changed, 1754 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 1344e31..fd93a06 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -23,7 +23,7 @@ function usage() {
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local interpreter repo dir to load> -g <interpreter group name>"
}
-while getopts "hp:d:l:v:u:g:" o; do
+while getopts "hc:p:d:l:v:u:g:" o; do
case ${o} in
h)
usage
@@ -32,8 +32,11 @@ while getopts "hp:d:l:v:u:g:" o; do
d)
INTERPRETER_DIR=${OPTARG}
;;
+ c)
+ CALLBACK_HOST=${OPTARG} # This will be used callback host
+ ;;
p)
- PORT=${OPTARG}
+ PORT=${OPTARG} # This will be used callback port
;;
l)
LOCAL_INTERPRETER_REPO=${OPTARG}
@@ -202,12 +205,12 @@ fi
if [[ -n "${SPARK_SUBMIT}" ]]; then
if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then
- INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}`
+ INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
else
- INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}`
+ INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT}`
fi
else
- INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} `
+ INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} `
fi
if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 3ec6e27..ce3ffaa 100755
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -430,4 +430,10 @@
<description>The HTTP X-XSS-Protection response header is a feature of Internet Explorer, Chrome and Safari that stops pages from loading when they detect reflected cross-site scripting (XSS) attacks. When value is set to 1 and a cross-site scripting attack is detected, the browser will sanitize the page (remove the unsafe parts).</description>
</property>
-->
+<!--
+<property>
+ <name>zeppelin.interpreter.callback.portRange</name>
+ <value>10000:10010</value>
+</property>
+-->
</configuration>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
----------------------------------------------------------------------
diff --git a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
index 2484469..3a5199d 100644
--- a/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
+++ b/helium-dev/src/main/java/org/apache/zeppelin/helium/ZeppelinDevServer.java
@@ -38,8 +38,8 @@ public class ZeppelinDevServer extends
private DevInterpreter interpreter = null;
private InterpreterOutput out;
- public ZeppelinDevServer(int port) throws TException {
- super(port);
+ public ZeppelinDevServer(int port) throws TException, IOException {
+ super(null, port);
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 6925360..7f476e8 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -17,29 +17,55 @@
package org.apache.zeppelin.interpreter.remote;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.dep.DependencyResolver;
-import org.apache.zeppelin.display.*;
-import org.apache.zeppelin.helium.*;
-import org.apache.zeppelin.interpreter.*;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.helium.Application;
+import org.apache.zeppelin.helium.ApplicationContext;
+import org.apache.zeppelin.helium.ApplicationException;
+import org.apache.zeppelin.helium.ApplicationLoader;
+import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
+import org.apache.zeppelin.helium.HeliumPackage;
+import org.apache.zeppelin.interpreter.Constants;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterHookListener;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
+import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterOutputListener;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.interpreter.thrift.*;
-import org.apache.zeppelin.resource.*;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.RemoteWorksController;
+import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
+import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
+import org.apache.zeppelin.resource.DistributedResourcePool;
+import org.apache.zeppelin.resource.Resource;
+import org.apache.zeppelin.resource.ResourcePool;
+import org.apache.zeppelin.resource.ResourceSet;
+import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@@ -49,8 +75,22 @@ import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
* Entry point for Interpreter process.
@@ -70,6 +110,9 @@ public class RemoteInterpreterServer
Gson gson = new Gson();
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
+ private String callbackHost;
+ private int callbackPort;
+ private String host;
private int port;
private TThreadPoolServer server;
@@ -87,11 +130,34 @@ public class RemoteInterpreterServer
// Hold information for manual progress update
private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>();
- public RemoteInterpreterServer(int port) throws TTransportException {
- this.port = port;
+ private boolean isTest;
+
+ public RemoteInterpreterServer(String callbackHost, int port) throws IOException,
+ TTransportException {
+ this(callbackHost, port, false);
+ }
+
+ public RemoteInterpreterServer(String callbackHost, int port, boolean isTest)
+ throws TTransportException, IOException {
+ if (null != callbackHost) {
+ this.callbackHost = callbackHost;
+ this.callbackPort = port;
+ } else {
+ // DevInterpreter
+ this.port = port;
+ }
+ this.isTest = isTest;
processor = new RemoteInterpreterService.Processor<>(this);
- TServerSocket serverTransport = new TServerSocket(port);
+ TServerSocket serverTransport;
+ if (null == callbackHost) {
+ // Dev Interpreter
+ serverTransport = new TServerSocket(port);
+ } else {
+ this.port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ this.host = RemoteInterpreterUtils.findAvailableHostAddress();
+ serverTransport = new TServerSocket(this.port);
+ }
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
@@ -100,6 +166,36 @@ public class RemoteInterpreterServer
@Override
public void run() {
+ if (null != callbackHost && !isTest) {
+ new Thread(new Runnable() {
+ boolean interrupted = false;
+ @Override
+ public void run() {
+ while (!interrupted && !server.isServing()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+
+ if (!interrupted) {
+ CallbackInfo callbackInfo = new CallbackInfo(host, port);
+ try {
+ RemoteInterpreterUtils
+ .registerInterpreter(callbackHost, callbackPort, callbackInfo);
+ } catch (TException e) {
+ logger.error("Error while registering interpreter: {}", callbackInfo, e);
+ try {
+ shutdown();
+ } catch (TException e1) {
+ logger.warn("Exception occurs while shutting down", e1);
+ }
+ }
+ }
+ }
+ }).start();
+ }
logger.info("Starting remote interpreter server on port {}", port);
server.serve();
}
@@ -151,13 +247,15 @@ public class RemoteInterpreterServer
public static void main(String[] args)
- throws TTransportException, InterruptedException {
-
+ throws TTransportException, InterruptedException, IOException {
+ String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
if (args.length > 0) {
- port = Integer.parseInt(args[0]);
+ callbackHost = args[0];
+ port = Integer.parseInt(args[1]);
}
- RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port);
+ RemoteInterpreterServer remoteInterpreterServer =
+ new RemoteInterpreterServer(callbackHost, port);
remoteInterpreterServer.start();
remoteInterpreterServer.join();
System.exit(0);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 4ee6690..835199a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -17,27 +17,96 @@
package org.apache.zeppelin.interpreter.remote;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.ConnectException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.InterfaceAddress;
+import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
/**
*
*/
public class RemoteInterpreterUtils {
static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
+
+
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
- int port;
- try (ServerSocket socket = new ServerSocket(0);) {
- port = socket.getLocalPort();
- socket.close();
+ return findRandomAvailablePortOnAllLocalInterfaces(":");
+ }
+
+ /**
+ * start:end
+ *
+ * @param portRange
+ * @return
+ * @throws IOException
+ */
+ public static int findRandomAvailablePortOnAllLocalInterfaces(String portRange)
+ throws IOException {
+
+ // ':' is the default value which means no constraints on the portRange
+ if (portRange == null || portRange.equals(":")) {
+ int port;
+ try (ServerSocket socket = new ServerSocket(0);) {
+ port = socket.getLocalPort();
+ socket.close();
+ }
+ return port;
}
- return port;
+ // valid user registered port https://en.wikipedia.org/wiki/Registered_port
+ int start = 1024;
+ int end = 49151;
+ String[] ports = portRange.split(":", -1);
+ if (!ports[0].isEmpty()) {
+ start = Integer.parseInt(ports[0]);
+ }
+ if (!ports[1].isEmpty()) {
+ end = Integer.parseInt(ports[1]);
+ }
+ for (int i = start; i <= end; ++i) {
+ try {
+ ServerSocket socket = new ServerSocket(i);
+ return socket.getLocalPort();
+ } catch (Exception e) {
+ // ignore this
+ }
+ }
+ throw new IOException("No available port in the portRange: " + portRange);
+ }
+
+ public static String findAvailableHostAddress() throws UnknownHostException, SocketException {
+ InetAddress address = InetAddress.getLocalHost();
+ if (address.isLoopbackAddress()) {
+ for (NetworkInterface networkInterface : Collections
+ .list(NetworkInterface.getNetworkInterfaces())) {
+ if (!networkInterface.isLoopback()) {
+ for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) {
+ InetAddress a = interfaceAddress.getAddress();
+ if (a instanceof Inet4Address) {
+ return a.getHostAddress();
+ }
+ }
+ }
+ }
+ }
+ return address.getHostAddress();
}
public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
@@ -80,4 +149,17 @@ public class RemoteInterpreterUtils {
return key.matches("^[A-Z_0-9]*");
}
+
+ public static void registerInterpreter(String callbackHost, int callbackPort,
+ final CallbackInfo callbackInfo) throws TException {
+ LOGGER.info("callbackHost: {}, callbackPort: {}, callbackInfo: {}", callbackHost, callbackPort,
+ callbackInfo);
+ try (TTransport transport = new TSocket(callbackHost, callbackPort)) {
+ transport.open();
+ TProtocol protocol = new TBinaryProtocol(transport);
+ RemoteInterpreterCallbackService.Client client = new RemoteInterpreterCallbackService.Client(
+ protocol);
+ client.callback(callbackInfo);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java
new file mode 100644
index 0000000..b0c7e9a
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/CallbackInfo.java
@@ -0,0 +1,518 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17")
+public class CallbackInfo implements org.apache.thrift.TBase<CallbackInfo, CallbackInfo._Fields>, java.io.Serializable, Cloneable, Comparable<CallbackInfo> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CallbackInfo");
+
+ private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new CallbackInfoStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new CallbackInfoTupleSchemeFactory());
+ }
+
+ public String host; // required
+ public int port; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ HOST((short)1, "host"),
+ PORT((short)2, "port");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // HOST
+ return HOST;
+ case 2: // PORT
+ return PORT;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __PORT_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CallbackInfo.class, metaDataMap);
+ }
+
+ public CallbackInfo() {
+ }
+
+ public CallbackInfo(
+ String host,
+ int port)
+ {
+ this();
+ this.host = host;
+ this.port = port;
+ setPortIsSet(true);
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public CallbackInfo(CallbackInfo other) {
+ __isset_bitfield = other.__isset_bitfield;
+ if (other.isSetHost()) {
+ this.host = other.host;
+ }
+ this.port = other.port;
+ }
+
+ public CallbackInfo deepCopy() {
+ return new CallbackInfo(this);
+ }
+
+ @Override
+ public void clear() {
+ this.host = null;
+ setPortIsSet(false);
+ this.port = 0;
+ }
+
+ public String getHost() {
+ return this.host;
+ }
+
+ public CallbackInfo setHost(String host) {
+ this.host = host;
+ return this;
+ }
+
+ public void unsetHost() {
+ this.host = null;
+ }
+
+ /** Returns true if field host is set (has been assigned a value) and false otherwise */
+ public boolean isSetHost() {
+ return this.host != null;
+ }
+
+ public void setHostIsSet(boolean value) {
+ if (!value) {
+ this.host = null;
+ }
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public CallbackInfo setPort(int port) {
+ this.port = port;
+ setPortIsSet(true);
+ return this;
+ }
+
+ public void unsetPort() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID);
+ }
+
+ /** Returns true if field port is set (has been assigned a value) and false otherwise */
+ public boolean isSetPort() {
+ return EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
+ }
+
+ public void setPortIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value);
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case HOST:
+ if (value == null) {
+ unsetHost();
+ } else {
+ setHost((String)value);
+ }
+ break;
+
+ case PORT:
+ if (value == null) {
+ unsetPort();
+ } else {
+ setPort((Integer)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case HOST:
+ return getHost();
+
+ case PORT:
+ return Integer.valueOf(getPort());
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case HOST:
+ return isSetHost();
+ case PORT:
+ return isSetPort();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof CallbackInfo)
+ return this.equals((CallbackInfo)that);
+ return false;
+ }
+
+ public boolean equals(CallbackInfo that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_host = true && this.isSetHost();
+ boolean that_present_host = true && that.isSetHost();
+ if (this_present_host || that_present_host) {
+ if (!(this_present_host && that_present_host))
+ return false;
+ if (!this.host.equals(that.host))
+ return false;
+ }
+
+ boolean this_present_port = true;
+ boolean that_present_port = true;
+ if (this_present_port || that_present_port) {
+ if (!(this_present_port && that_present_port))
+ return false;
+ if (this.port != that.port)
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_host = true && (isSetHost());
+ list.add(present_host);
+ if (present_host)
+ list.add(host);
+
+ boolean present_port = true;
+ list.add(present_port);
+ if (present_port)
+ list.add(port);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(CallbackInfo other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetHost()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetPort()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("CallbackInfo(");
+ boolean first = true;
+
+ sb.append("host:");
+ if (this.host == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.host);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("port:");
+ sb.append(this.port);
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class CallbackInfoStandardSchemeFactory implements SchemeFactory {
+ public CallbackInfoStandardScheme getScheme() {
+ return new CallbackInfoStandardScheme();
+ }
+ }
+
+ private static class CallbackInfoStandardScheme extends StandardScheme<CallbackInfo> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, CallbackInfo struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // HOST
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.host = iprot.readString();
+ struct.setHostIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // PORT
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.port = iprot.readI32();
+ struct.setPortIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, CallbackInfo struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.host != null) {
+ oprot.writeFieldBegin(HOST_FIELD_DESC);
+ oprot.writeString(struct.host);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(PORT_FIELD_DESC);
+ oprot.writeI32(struct.port);
+ oprot.writeFieldEnd();
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class CallbackInfoTupleSchemeFactory implements SchemeFactory {
+ public CallbackInfoTupleScheme getScheme() {
+ return new CallbackInfoTupleScheme();
+ }
+ }
+
+ private static class CallbackInfoTupleScheme extends TupleScheme<CallbackInfo> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetHost()) {
+ optionals.set(0);
+ }
+ if (struct.isSetPort()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetHost()) {
+ oprot.writeString(struct.host);
+ }
+ if (struct.isSetPort()) {
+ oprot.writeI32(struct.port);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, CallbackInfo struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.host = iprot.readString();
+ struct.setHostIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.port = iprot.readI32();
+ struct.setPortIsSet(true);
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java
new file mode 100644
index 0000000..6ef08f6
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterCallbackService.java
@@ -0,0 +1,879 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2017-6-17")
+public class RemoteInterpreterCallbackService {
+
+ public interface Iface {
+
+ public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException;
+
+ }
+
+ public interface AsyncIface {
+
+ public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
+ }
+
+ public static class Client extends org.apache.thrift.TServiceClient implements Iface {
+ public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
+ public Factory() {}
+ public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
+ return new Client(prot);
+ }
+ public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+ return new Client(iprot, oprot);
+ }
+ }
+
+ public Client(org.apache.thrift.protocol.TProtocol prot)
+ {
+ super(prot, prot);
+ }
+
+ public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+ super(iprot, oprot);
+ }
+
+ public void callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException
+ {
+ send_callback(callbackInfo);
+ recv_callback();
+ }
+
+ public void send_callback(CallbackInfo callbackInfo) throws org.apache.thrift.TException
+ {
+ callback_args args = new callback_args();
+ args.setCallbackInfo(callbackInfo);
+ sendBase("callback", args);
+ }
+
+ public void recv_callback() throws org.apache.thrift.TException
+ {
+ callback_result result = new callback_result();
+ receiveBase(result, "callback");
+ return;
+ }
+
+ }
+ public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
+ public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+ private org.apache.thrift.async.TAsyncClientManager clientManager;
+ private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
+ public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+ this.clientManager = clientManager;
+ this.protocolFactory = protocolFactory;
+ }
+ public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+ return new AsyncClient(protocolFactory, clientManager, transport);
+ }
+ }
+
+ public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
+ super(protocolFactory, clientManager, transport);
+ }
+
+ public void callback(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ callback_call method_call = new callback_call(callbackInfo, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class callback_call extends org.apache.thrift.async.TAsyncMethodCall {
+ private CallbackInfo callbackInfo;
+ public callback_call(CallbackInfo callbackInfo, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.callbackInfo = callbackInfo;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("callback", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ callback_args args = new callback_args();
+ args.setCallbackInfo(callbackInfo);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_callback();
+ }
+ }
+
+ }
+
+ public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class.getName());
+ public Processor(I iface) {
+ super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
+ }
+
+ protected Processor(I iface, Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ super(iface, getProcessMap(processMap));
+ }
+
+ private static <I extends Iface> Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(Map<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ processMap.put("callback", new callback());
+ return processMap;
+ }
+
+ public static class callback<I extends Iface> extends org.apache.thrift.ProcessFunction<I, callback_args> {
+ public callback() {
+ super("callback");
+ }
+
+ public callback_args getEmptyArgsInstance() {
+ return new callback_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public callback_result getResult(I iface, callback_args args) throws org.apache.thrift.TException {
+ callback_result result = new callback_result();
+ iface.callback(args.callbackInfo);
+ return result;
+ }
+ }
+
+ }
+
+ public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName());
+ public AsyncProcessor(I iface) {
+ super(iface, getProcessMap(new HashMap<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
+ }
+
+ protected AsyncProcessor(I iface, Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
+ super(iface, getProcessMap(processMap));
+ }
+
+ private static <I extends AsyncIface> Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(Map<String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
+ processMap.put("callback", new callback());
+ return processMap;
+ }
+
+ public static class callback<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, callback_args, Void> {
+ public callback() {
+ super("callback");
+ }
+
+ public callback_args getEmptyArgsInstance() {
+ return new callback_args();
+ }
+
+ public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new AsyncMethodCallback<Void>() {
+ public void onComplete(Void o) {
+ callback_result result = new callback_result();
+ try {
+ fcall.sendResponse(fb,result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+ return;
+ } catch (Exception e) {
+ LOGGER.error("Exception writing to internal frame buffer", e);
+ }
+ fb.close();
+ }
+ public void onError(Exception e) {
+ byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.thrift.TBase msg;
+ callback_result result = new callback_result();
+ {
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = (org.apache.thrift.TBase)new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb,msg,msgType,seqid);
+ return;
+ } catch (Exception ex) {
+ LOGGER.error("Exception writing to internal frame buffer", ex);
+ }
+ fb.close();
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public void start(I iface, callback_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+ iface.callback(args.callbackInfo,resultHandler);
+ }
+ }
+
+ }
+
+ public static class callback_args implements org.apache.thrift.TBase<callback_args, callback_args._Fields>, java.io.Serializable, Cloneable, Comparable<callback_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_args");
+
+ private static final org.apache.thrift.protocol.TField CALLBACK_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("callbackInfo", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new callback_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new callback_argsTupleSchemeFactory());
+ }
+
+ public CallbackInfo callbackInfo; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ CALLBACK_INFO((short)1, "callbackInfo");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // CALLBACK_INFO
+ return CALLBACK_INFO;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.CALLBACK_INFO, new org.apache.thrift.meta_data.FieldMetaData("callbackInfo", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, CallbackInfo.class)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_args.class, metaDataMap);
+ }
+
+ public callback_args() {
+ }
+
+ public callback_args(
+ CallbackInfo callbackInfo)
+ {
+ this();
+ this.callbackInfo = callbackInfo;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public callback_args(callback_args other) {
+ if (other.isSetCallbackInfo()) {
+ this.callbackInfo = new CallbackInfo(other.callbackInfo);
+ }
+ }
+
+ public callback_args deepCopy() {
+ return new callback_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.callbackInfo = null;
+ }
+
+ public CallbackInfo getCallbackInfo() {
+ return this.callbackInfo;
+ }
+
+ public callback_args setCallbackInfo(CallbackInfo callbackInfo) {
+ this.callbackInfo = callbackInfo;
+ return this;
+ }
+
+ public void unsetCallbackInfo() {
+ this.callbackInfo = null;
+ }
+
+ /** Returns true if field callbackInfo is set (has been assigned a value) and false otherwise */
+ public boolean isSetCallbackInfo() {
+ return this.callbackInfo != null;
+ }
+
+ public void setCallbackInfoIsSet(boolean value) {
+ if (!value) {
+ this.callbackInfo = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case CALLBACK_INFO:
+ if (value == null) {
+ unsetCallbackInfo();
+ } else {
+ setCallbackInfo((CallbackInfo)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case CALLBACK_INFO:
+ return getCallbackInfo();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case CALLBACK_INFO:
+ return isSetCallbackInfo();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof callback_args)
+ return this.equals((callback_args)that);
+ return false;
+ }
+
+ public boolean equals(callback_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_callbackInfo = true && this.isSetCallbackInfo();
+ boolean that_present_callbackInfo = true && that.isSetCallbackInfo();
+ if (this_present_callbackInfo || that_present_callbackInfo) {
+ if (!(this_present_callbackInfo && that_present_callbackInfo))
+ return false;
+ if (!this.callbackInfo.equals(that.callbackInfo))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ boolean present_callbackInfo = true && (isSetCallbackInfo());
+ list.add(present_callbackInfo);
+ if (present_callbackInfo)
+ list.add(callbackInfo);
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(callback_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetCallbackInfo()).compareTo(other.isSetCallbackInfo());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCallbackInfo()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.callbackInfo, other.callbackInfo);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("callback_args(");
+ boolean first = true;
+
+ sb.append("callbackInfo:");
+ if (this.callbackInfo == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.callbackInfo);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ if (callbackInfo != null) {
+ callbackInfo.validate();
+ }
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class callback_argsStandardSchemeFactory implements SchemeFactory {
+ public callback_argsStandardScheme getScheme() {
+ return new callback_argsStandardScheme();
+ }
+ }
+
+ private static class callback_argsStandardScheme extends StandardScheme<callback_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, callback_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // CALLBACK_INFO
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.callbackInfo = new CallbackInfo();
+ struct.callbackInfo.read(iprot);
+ struct.setCallbackInfoIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, callback_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.callbackInfo != null) {
+ oprot.writeFieldBegin(CALLBACK_INFO_FIELD_DESC);
+ struct.callbackInfo.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class callback_argsTupleSchemeFactory implements SchemeFactory {
+ public callback_argsTupleScheme getScheme() {
+ return new callback_argsTupleScheme();
+ }
+ }
+
+ private static class callback_argsTupleScheme extends TupleScheme<callback_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetCallbackInfo()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetCallbackInfo()) {
+ struct.callbackInfo.write(oprot);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, callback_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.callbackInfo = new CallbackInfo();
+ struct.callbackInfo.read(iprot);
+ struct.setCallbackInfoIsSet(true);
+ }
+ }
+ }
+
+ }
+
+ public static class callback_result implements org.apache.thrift.TBase<callback_result, callback_result._Fields>, java.io.Serializable, Cloneable, Comparable<callback_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("callback_result");
+
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new callback_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new callback_resultTupleSchemeFactory());
+ }
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(callback_result.class, metaDataMap);
+ }
+
+ public callback_result() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public callback_result(callback_result other) {
+ }
+
+ public callback_result deepCopy() {
+ return new callback_result(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof callback_result)
+ return this.equals((callback_result)that);
+ return false;
+ }
+
+ public boolean equals(callback_result that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ List<Object> list = new ArrayList<Object>();
+
+ return list.hashCode();
+ }
+
+ @Override
+ public int compareTo(callback_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("callback_result(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class callback_resultStandardSchemeFactory implements SchemeFactory {
+ public callback_resultStandardScheme getScheme() {
+ return new callback_resultStandardScheme();
+ }
+ }
+
+ private static class callback_resultStandardScheme extends StandardScheme<callback_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, callback_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, callback_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class callback_resultTupleSchemeFactory implements SchemeFactory {
+ public callback_resultTupleScheme getScheme() {
+ return new callback_resultTupleScheme();
+ }
+ }
+
+ private static class callback_resultTupleScheme extends TupleScheme<callback_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, callback_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
index f2eb13f..f20fb90 100644
--- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
+++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift
@@ -88,6 +88,11 @@ struct InterpreterCompletion {
3: string meta
}
+struct CallbackInfo {
+ 1: string host,
+ 2: i32 port
+}
+
service RemoteInterpreterService {
void createInterpreter(1: string intpGroupId, 2: string sessionKey, 3: string className, 4: map<string, string> properties, 5: string userName);
@@ -131,3 +136,7 @@ service RemoteInterpreterService {
void onReceivedZeppelinResource(1: string object);
}
+
+service RemoteInterpreterCallbackService {
+ void callback(1: CallbackInfo callbackInfo);
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
index a4b3a25..1a7c2a5 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
@@ -42,7 +42,7 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStop() throws InterruptedException, IOException, TException {
- RemoteInterpreterServer server = new RemoteInterpreterServer(
+ RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
assertEquals(false, server.isRunning());
@@ -90,8 +90,8 @@ public class RemoteInterpreterServerTest {
@Test
public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException {
- RemoteInterpreterServer server = new RemoteInterpreterServer(
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces());
+ RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
+ RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), true);
assertEquals(false, server.isRunning());
server.start();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
index 5f7426a..afbbcbd 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -28,6 +28,17 @@ public class RemoteInterpreterUtilsTest {
@Test
public void testFindRandomAvailablePortOnAllLocalInterfaces() throws IOException {
assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces() > 0);
+
+ String portRange = ":30000";
+ assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) <= 30000);
+
+ portRange = "30000:";
+ assertTrue(RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange) >= 30000);
+
+ portRange = "30000:40000";
+ int port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
+ assertTrue(port >= 30000 && port <= 40000);
}
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 03cc069..ba90ed8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -476,6 +476,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
}
}
+ public String getCallbackPortRange() {
+ return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
+ }
+
public boolean isWindowsPath(String path){
return path.matches("^[A-Za-z]:\\\\.*");
}
@@ -684,7 +688,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
ZEPPELIN_HDFS_KEYTAB("zeppelin.hdfs.keytab", ""),
- ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", "");
+ ZEPPELIN_HDFS_PRINCIPAL("zeppelin.hdfs.principal", ""),
+
+ ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
private String varName;
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 9f4cfd4..79618a3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -705,7 +705,8 @@ public class InterpreterSetting {
// create new remote process
remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
interpreterRunner != null ? interpreterRunner.getPath() :
- conf.getInterpreterRemoteRunnerPath(), interpreterDir, localRepoPath,
+ conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
+ interpreterDir, localRepoPath,
getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
remoteInterpreterProcessListener, appEventListener, group);
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 19356fb..2d64831 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -17,10 +17,23 @@
package org.apache.zeppelin.interpreter.remote;
-import org.apache.commons.exec.*;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.exec.ExecuteResultHandler;
+import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.exec.LogOutputStream;
+import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.thrift.CallbackInfo;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterCallbackService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +42,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class manages start / stop of remote interpreter process
@@ -37,11 +51,14 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
implements ExecuteResultHandler {
private static final Logger logger = LoggerFactory.getLogger(
RemoteInterpreterManagedProcess.class);
- private final String interpreterRunner;
+ private final String interpreterRunner;
+ private final String portRange;
private DefaultExecutor executor;
private ExecuteWatchdog watchdog;
- boolean running = false;
+ private AtomicBoolean running = new AtomicBoolean(false);
+ TServer callbackServer;
+ private String host = null;
private int port = -1;
private final String interpreterDir;
private final String localRepoDir;
@@ -51,6 +68,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
public RemoteInterpreterManagedProcess(
String intpRunner,
+ String portRange,
String intpDir,
String localRepoDir,
Map<String, String> env,
@@ -61,6 +79,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
super(new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
this.interpreterRunner = intpRunner;
+ this.portRange = portRange;
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
@@ -77,6 +96,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
super(remoteInterpreterEventPoller,
connectTimeout);
this.interpreterRunner = intpRunner;
+ this.portRange = ":";
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
@@ -96,18 +116,69 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
@Override
public void start(String userName, Boolean isUserImpersonate) {
// start server process
+ final String callbackHost;
+ final int callbackPort;
try {
- port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(portRange);
logger.info("Choose port {} for RemoteInterpreterProcess", port);
+ callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
+ callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
throw new InterpreterException(e1);
}
+ logger.info("Thrift server for callback will start. Port: {}", callbackPort);
+ try {
+ callbackServer = new TThreadPoolServer(
+ new TThreadPoolServer.Args(new TServerSocket(callbackPort)).processor(
+ new RemoteInterpreterCallbackService.Processor<>(
+ new RemoteInterpreterCallbackService.Iface() {
+ @Override
+ public void callback(CallbackInfo callbackInfo) throws TException {
+ logger.info("Registered: {}", callbackInfo);
+ host = callbackInfo.getHost();
+ port = callbackInfo.getPort();
+ running.set(true);
+ synchronized (running) {
+ running.notify();
+ }
+ }
+ })));
+ // Start thrift server to receive callbackInfo from RemoteInterpreterServer;
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ callbackServer.serve();
+ }
+ }).start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ if (callbackServer.isServing()) {
+ callbackServer.stop();
+ }
+ }
+ }));
+
+ while (!callbackServer.isServing()) {
+ logger.debug("callbackServer is not serving");
+ Thread.sleep(500);
+ }
+ logger.debug("callbackServer is serving now");
+ } catch (TTransportException e) {
+ logger.error("callback server error.", e);
+ } catch (InterruptedException e) {
+ logger.warn("", e);
+ }
+
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
+ cmdLine.addArgument("-c", false);
+ cmdLine.addArgument(callbackHost, false);
cmdLine.addArgument("-p", false);
- cmdLine.addArgument(Integer.toString(port), false);
+ cmdLine.addArgument(Integer.toString(callbackPort), false);
if (isUserImpersonate && !userName.equals("anonymous")) {
cmdLine.addArgument("-u", false);
cmdLine.addArgument(userName, false);
@@ -133,45 +204,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
logger.info("Run interpreter process {}", cmdLine);
executor.execute(cmdLine, procEnv, this);
- running = true;
} catch (IOException e) {
- running = false;
+ running.set(false);
throw new InterpreterException(e);
}
-
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
- if (!running) {
- try {
- cmdOut.flush();
- } catch (IOException e) {
- // nothing to do
+ try {
+ synchronized (running) {
+ if (!running.get()) {
+ running.wait(getConnectTimeout() * 2);
}
- throw new InterpreterException(new String(cmdOut.toByteArray()));
}
-
- try {
- if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) {
- break;
- } else {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- logger.error("Exception in RemoteInterpreterProcess while synchronized reference " +
- "Thread.sleep", e);
- }
- }
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Remote interpreter not yet accessible at localhost:" + port);
- }
+ if (!running.get()) {
+ callbackServer.stop();
+ throw new InterpreterException("Cannot run interpreter");
}
+ } catch (InterruptedException e) {
+ logger.error("Remote interpreter is not accessible");
}
processOutput.setOutputStream(null);
}
public void stop() {
+ if (callbackServer.isServing()) {
+ callbackServer.stop();
+ }
if (isRunning()) {
logger.info("kill interpreter process");
try {
@@ -190,25 +247,25 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
executor = null;
watchdog = null;
- running = false;
+ running.set(false);
logger.info("Remote process terminated");
}
@Override
public void onProcessComplete(int exitValue) {
logger.info("Interpreter process exited {}", exitValue);
- running = false;
+ running.set(false);
}
@Override
public void onProcessFailed(ExecuteException e) {
logger.info("Interpreter process failed {}", e);
- running = false;
+ running.set(false);
}
public boolean isRunning() {
- return running;
+ return running.get();
}
private static class ProcessLogOutputStream extends LogOutputStream {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1812928b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index f4b8c32..f044fbd 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -357,16 +357,16 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
config.put("cron", "* * * * * ?");
note.setConfig(config);
notebook.refreshCron(note.getId());
- Thread.sleep(1 * 1000);
+ Thread.sleep(2 * 1000);
// remove cron scheduler.
config.put("cron", null);
note.setConfig(config);
notebook.refreshCron(note.getId());
- Thread.sleep(1000);
+ Thread.sleep(2 * 1000);
dateFinished = p.getDateFinished();
assertNotNull(dateFinished);
- Thread.sleep(1 * 1000);
+ Thread.sleep(2 * 1000);
assertEquals(dateFinished, p.getDateFinished());
notebook.removeNote(note.getId(), anonymous);
}