You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by li...@apache.org on 2019/06/13 07:42:11 UTC
[zeppelin] branch master updated: [ZEPPELIN-3623] Create
interpreter process in the cluster mode
This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new fd6e80c [ZEPPELIN-3623] Create interpreter process in the cluster mode
fd6e80c is described below
commit fd6e80cfc49647ac57ca53e1dff9cdfbae164865
Author: Xun Liu <li...@apache.org>
AuthorDate: Thu Jun 13 11:22:45 2019 +0800
[ZEPPELIN-3623] Create interpreter process in the cluster mode
### What is this PR for?
In distributed cluster deployment mode, look for servers with idle resources in the cluster and create an interpreter process. In single-server deployment mode, the interpreter process is created directly on the machine.
Just set the cluster server list in `zeppelin-site.xml`
```
<property>
<name>zeppelin.cluster.addr</name>
<value>10.120.196.234:6000,10.120.196.235:6000,10.120.196.236:6000</value>
</property>
```
The interpreter process can be created in the cluster server through any zeppelin server.
[Cluster Design Document](https://docs.google.com/document/d/1a8QLSyR3M5AhlG1GIYuDTj6bwazeuVDKCRRBm-Qa3Bw/edit#heading=h.s41ckl271z8s)
### What type of PR is it?
[Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3623
### How should this be tested?
* [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/537437337)
### Screenshots (if appropriate)
#### The note is executed on the zeppelin-server-234 and the interpreter process is created on the zeppelin-server-236 host.
![ClusterCreateIntpProcess](https://user-images.githubusercontent.com/3677382/58382741-7dc61680-8000-11e9-8379-78b8e6743b63.gif)
### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?
Author: Xun Liu <li...@apache.org>
Closes #3372 from liuxunorg/ZEPPELIN-3623 and squashes the following commits:
ea4ded954 [Xun Liu] Delete unused cluster thrift interfaces
712bba50f [Xun Liu] Add licese header.
3c8922cb6 [Xun Liu] Get the conf path through the `ZEPPELIN_HOME` or `ZEPPELIN_CONF_DIR` environment variable.
b88c09942 [Xun Liu] The ClusterInterpreterLauncherTest is tested, Turn off cluster.
83f540622 [Xun Liu] [ZEPPELIN-3623] Create interpreter process in the cluster mode
---
zeppelin-interpreter-api/pom.xml | 6 +-
.../apache/zeppelin/cluster/ClusterManager.java | 3 +-
.../zeppelin/cluster/ClusterManagerClient.java | 4 +-
.../zeppelin/cluster/ClusterManagerServer.java | 228 +++--
.../apache/zeppelin/cluster/ClusterMonitor.java | 16 +-
.../zeppelin/cluster/ClusterStateMachine.java | 18 +-
.../ClusterEvent.java} | 9 +-
.../ClusterEventListener.java} | 13 +-
.../apache/zeppelin/cluster/meta/ClusterMeta.java | 14 +-
.../zeppelin/cluster/meta/ClusterMetaType.java | 4 +-
.../zeppelin/conf/ZeppelinConfiguration.java | 34 +-
.../remote/RemoteInterpreterServer.java | 57 +-
.../thrift/ClusterIntpProcParameters.java | 914 -------------------
.../interpreter/thrift/ClusterManagerService.java | 995 ---------------------
.../src/main/thrift/ClusterManagerService.thrift | 32 -
zeppelin-interpreter/src/main/thrift/genthrift.sh | 1 -
.../zeppelin/cluster/ClusterMultiNodeTest.java | 159 ++++
...ManagerTest.java => ClusterSingleNodeTest.java} | 66 +-
zeppelin-plugins/launcher/cluster/pom.xml | 90 ++
.../launcher/ClusterInterpreterLauncher.java | 200 +++++
.../launcher/ClusterInterpreterProcess.java | 141 +++
.../launcher/ClusterInterpreterLauncherTest.java | 101 +++
.../interpreter/launcher/ClusterMockTest.java | 79 +-
zeppelin-plugins/pom.xml | 1 +
.../org/apache/zeppelin/server/ZeppelinServer.java | 18 +-
.../zeppelin/interpreter/InterpreterSetting.java | 6 +
.../remote/RemoteInterpreterManagedProcess.java | 4 +
27 files changed, 1023 insertions(+), 2190 deletions(-)
diff --git a/zeppelin-interpreter-api/pom.xml b/zeppelin-interpreter-api/pom.xml
index a43ee89..c1f0ae8 100644
--- a/zeppelin-interpreter-api/pom.xml
+++ b/zeppelin-interpreter-api/pom.xml
@@ -68,8 +68,6 @@
<exclude>org.apache.commons:commons-exec</exclude>
<!-- Leave log4j unshaded so downstream users can configure logging. -->
<exclude>log4j:log4j</exclude>
- <exclude>com.esotericsoftware:kryo</exclude>
- <exclude>com.esotericsoftware:reflectasm</exclude>
</excludes>
</artifactSet>
<filters>
@@ -138,6 +136,10 @@
<pattern>io</pattern>
<shadedPattern>${shaded.dependency.prefix}.io</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.esotericsoftware</pattern>
+ <shadedPattern>${shaded.dependency.prefix}.com.esotericsoftware</shadedPattern>
+ </relocation>
</relocations>
</configuration>
<executions>
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
index 9c70a2e..2b2cd50 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -252,7 +252,7 @@ public abstract class ClusterManager {
while (!raftInitialized()) {
retry++;
if (0 == retry % 30) {
- LOGGER.error("Raft incomplete initialization! retry[{}]", retry);
+ LOGGER.warn("Raft incomplete initialization! retry[{}]", retry);
}
Thread.sleep(100);
}
@@ -268,6 +268,7 @@ public abstract class ClusterManager {
if (true == success) {
// The operation was successfully deleted
clusterMetaQueue.remove(metaEntity);
+ LOGGER.info("Cluster Meta Consume success! {}", metaEntity);
} else {
LOGGER.error("Cluster Meta Consume faild!");
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
index c969bd6..57d51e3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
@@ -18,7 +18,7 @@ package org.apache.zeppelin.cluster;
import io.atomix.primitive.PrimitiveState;
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
/**
* Cluster management client class instantiated in zeppelin-interperter
@@ -63,7 +63,7 @@ public class ClusterManagerClient extends ClusterManager {
// Instantiated cluster monitoring class
clusterMonitor = new ClusterMonitor(this);
- clusterMonitor.start(IntpProcessMeta, metaKey);
+ clusterMonitor.start(INTP_PROCESS_META, metaKey);
}
public void shutdown() {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index 41e670a..eb64393 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -16,6 +16,8 @@
*/
package org.apache.zeppelin.cluster;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.*;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
@@ -30,20 +32,10 @@ import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import io.atomix.utils.net.Address;
import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterFactoryInterface;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.apache.zeppelin.interpreter.thrift.ClusterIntpProcParameters;
-import org.apache.zeppelin.interpreter.thrift.ClusterManagerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,20 +48,22 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.time.Duration;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META;
/**
* Cluster management server class instantiated in zeppelin-server
* 1. Create a raft server
* 2. Remotely create interpreter's thrift service
*/
-public class ClusterManagerServer extends ClusterManager
- implements ClusterManagerService.Iface {
+public class ClusterManagerServer extends ClusterManager {
private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerServer.class);
private static ClusterManagerServer instance = null;
@@ -79,22 +73,15 @@ public class ClusterManagerServer extends ClusterManager
protected MessagingService messagingService = null;
- // zeppelin cluster manager thrift service
- private TThreadPoolServer clusterManagerTserver = null;
- private ClusterManagerService.Processor<ClusterManagerServer> clusterManagerProcessor = null;
-
- // Find interpreter by note
- private InterpreterFactoryInterface interpreterFactory = null;
-
// Connect to the interpreter process that has been created
public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS";
+ private List<ClusterEventListener> clusterEventListeners = new ArrayList<>();
+ // zeppelin cluster event
+ public static String ZEPL_CLUSTER_EVENT_TOPIC = "ZEPL_CLUSTER_EVENT_TOPIC";
+
private ClusterManagerServer() {
super();
-
- clusterManagerProcessor = new ClusterManagerService.Processor<>(this);
-
- deleteRaftSystemData();
}
public static ClusterManagerServer getInstance() {
@@ -106,23 +93,46 @@ public class ClusterManagerServer extends ClusterManager
}
}
- public void start(InterpreterFactoryInterface interpreterFactory) {
+ public void start() {
if (!zconf.isClusterMode()) {
return;
}
- this.interpreterFactory = interpreterFactory;
-
initThread();
// Instantiated raftServer monitoring class
String clusterName = getClusterNodeName();
clusterMonitor = new ClusterMonitor(this);
- clusterMonitor.start(ServerMeta, clusterName);
+ clusterMonitor.start(SERVER_META, clusterName);
super.start();
}
+ @VisibleForTesting
+ public void initTestCluster(String clusterAddrList, String host, int port) {
+ this.zeplServerHost = host;
+ this.raftServerPort = port;
+
+ // clear
+ clusterNodes.clear();
+ raftAddressMap.clear();
+ clusterMemberIds.clear();
+
+ String cluster[] = clusterAddrList.split(",");
+ for (int i = 0; i < cluster.length; i++) {
+ String[] parts = cluster[i].split(":");
+ String clusterHost = parts[0];
+ int clusterPort = Integer.valueOf(parts[1]);
+
+ String memberId = clusterHost + ":" + clusterPort;
+ Address address = Address.from(clusterHost, clusterPort);
+ Node node = Node.builder().withId(memberId).withAddress(address).build();
+ clusterNodes.add(node);
+ raftAddressMap.put(MemberId.from(memberId), address);
+ clusterMemberIds.add(MemberId.from(memberId));
+ }
+ }
+
@Override
public boolean raftInitialized() {
if (null != raftServer && raftServer.isRunning()
@@ -145,32 +155,6 @@ public class ClusterManagerServer extends ClusterManager
return true;
}
- protected void deleteRaftSystemData() {
- String zeppelinHome = zconf.getZeppelinHome();
- Path directory = new File(zeppelinHome, ".data").toPath();
- if (Files.exists(directory)) {
- try {
- Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
- throws IOException {
- Files.delete(file);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc)
- throws IOException {
- Files.delete(dir);
- return FileVisitResult.CONTINUE;
- }
- });
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
private void initThread() {
// RaftServer Thread
new Thread(new Runnable() {
@@ -206,11 +190,15 @@ public class ClusterManagerServer extends ClusterManager
bootstrapService,
new MembershipConfig());
+ File atomixDateDir = com.google.common.io.Files.createTempDir();
+ atomixDateDir.deleteOnExit();
+
RaftServer.Builder builder = RaftServer.builder(member.id())
.withMembershipService(clusterService)
.withProtocol(protocol)
.withStorage(RaftStorage.builder()
.withStorageLevel(StorageLevel.MEMORY)
+ .withDirectory(atomixDateDir)
.withSerializer(storageSerializer)
.withMaxSegmentSize(1024 * 1024)
.build());
@@ -218,46 +206,10 @@ public class ClusterManagerServer extends ClusterManager
raftServer = builder.build();
raftServer.bootstrap(clusterMemberIds);
- LOGGER.info("RaftServer run() <<<");
- }
- }).start();
-
- // Cluster manager thrift thread
- new Thread(new Runnable() {
- @Override
- public void run() {
- LOGGER.info("TServerThread run() >>>");
-
- ZeppelinConfiguration zconf = new ZeppelinConfiguration();
- String portRange = zconf.getZeppelinServerRPCPortRange();
-
- try {
- TServerSocket serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
- int tserverPort = serverTransport.getServerSocket().getLocalPort();
-
- clusterManagerTserver = new TThreadPoolServer(
- new TThreadPoolServer.Args(serverTransport).processor(clusterManagerProcessor));
- LOGGER.info("Starting raftServer manager Tserver on port {}", tserverPort);
-
- String nodeName = getClusterNodeName();
- HashMap<String, Object> meta = new HashMap<String, Object>();
- meta.put(ClusterMeta.NODE_NAME, nodeName);
- meta.put(ClusterMeta.SERVER_TSERVER_HOST, zeplServerHost);
- meta.put(ClusterMeta.SERVER_TSERVER_PORT, tserverPort);
- meta.put(ClusterMeta.SERVER_START_TIME, new Date());
-
- putClusterMeta(ServerMeta, nodeName, meta);
- } catch (UnknownHostException e) {
- LOGGER.error(e.getMessage());
- } catch (SocketException e) {
- LOGGER.error(e.getMessage());
- } catch (IOException e) {
- LOGGER.error(e.getMessage());
- }
-
- clusterManagerTserver.serve();
+ messagingService.registerHandler(ZEPL_CLUSTER_EVENT_TOPIC,
+ subscribeClusterEvent, MoreExecutors.directExecutor());
- LOGGER.info("TServerThread run() <<<");
+ LOGGER.info("RaftServer run() <<<");
}
}).start();
}
@@ -270,58 +222,34 @@ public class ClusterManagerServer extends ClusterManager
try {
// delete local machine meta
- deleteClusterMeta(ServerMeta, getClusterNodeName());
+ deleteClusterMeta(SERVER_META, getClusterNodeName());
Thread.sleep(300);
clusterMonitor.shutdown();
// wait raft commit metadata
Thread.sleep(300);
} catch (InterruptedException e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error(e.getMessage(), e);
}
if (null != raftServer && raftServer.isRunning()) {
try {
raftServer.shutdown().get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error(e.getMessage(), e);
} catch (ExecutionException e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error(e.getMessage(), e);
} catch (TimeoutException e) {
- LOGGER.error(e.getMessage());
+ LOGGER.error(e.getMessage(), e);
}
}
- clusterManagerTserver.stop();
-
super.shutdown();
}
- public boolean openRemoteInterpreterProcess(
- String host, int port, final ClusterIntpProcParameters clusterIntpProcParameters)
- throws TException {
- LOGGER.info("host: {}, port: {}, clusterIntpProcParameters: {}",
- host, port, clusterIntpProcParameters);
-
- try (TTransport transport = new TSocket(host, port)) {
- transport.open();
- TProtocol protocol = new TBinaryProtocol(transport);
- ClusterManagerService.Client client = new ClusterManagerService.Client(protocol);
-
- return client.createClusterInterpreterProcess(clusterIntpProcParameters);
- }
- }
-
- @Override
- public boolean createClusterInterpreterProcess(ClusterIntpProcParameters clusterIntpProcParameters) {
- // TODO: ZEPPELIN-3623
-
- return true;
- }
-
// Obtain the server node whose resources are idle in the cluster
public HashMap<String, Object> getIdleNodeMeta() {
HashMap<String, Object> idleNodeMeta = null;
- HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(ServerMeta, "");
+ HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(SERVER_META, "");
long memoryIdle = 0;
for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
@@ -344,4 +272,56 @@ public class ClusterManagerServer extends ClusterManager
return idleNodeMeta;
}
+
+ public void unicastClusterEvent(String host, int port, String msg) {
+ LOGGER.info("send unicastClusterEvent message {}", msg);
+
+ Address address = Address.from(host, port);
+ CompletableFuture<byte[]> response = messagingService.sendAndReceive(address,
+ ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
+ response.whenComplete((r, e) -> {
+ if (null == e) {
+ LOGGER.error(e.getMessage(), e);
+ } else {
+ LOGGER.info("unicastClusterEvent success! {}", msg);
+ }
+ });
+ }
+
+ public void broadcastClusterEvent(String msg) {
+ LOGGER.info("send broadcastClusterEvent message {}", msg);
+
+ for (Node node : clusterNodes) {
+ if (StringUtils.equals(node.address().host(), zeplServerHost)
+ && node.address().port() == raftServerPort) {
+ // skip myself
+ continue;
+ }
+
+ CompletableFuture<byte[]> response = messagingService.sendAndReceive(node.address(),
+ ZEPL_CLUSTER_EVENT_TOPIC, msg.getBytes(), Duration.ofSeconds(2));
+ response.whenComplete((r, e) -> {
+ if (null == e) {
+ LOGGER.error(e.getMessage(), e);
+ } else {
+ LOGGER.info("broadcastClusterNoteEvent success! {}", msg);
+ }
+ });
+ }
+ }
+
+ private BiFunction<Address, byte[], byte[]> subscribeClusterEvent = (address, data) -> {
+ String message = new String(data);
+ LOGGER.info("subscribeClusterEvent() {}", message);
+
+ for (ClusterEventListener eventListener : clusterEventListeners) {
+ eventListener.onClusterEvent(message);
+ }
+
+ return null;
+ };
+
+ public void addClusterEventListeners(ClusterEventListener listener) {
+ clusterEventListeners.add(listener);
+ }
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
index 86fa6a7..4fe8d98 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
@@ -27,8 +27,8 @@ import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta;
-import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.SERVER_META;
/**
* cluster monitoring
@@ -94,11 +94,11 @@ public class ClusterMonitor {
public void run() {
while (running.get()) {
switch (clusterMetaType) {
- case ServerMeta:
+ case SERVER_META:
sendMachineUsage();
checkHealthy();
break;
- case IntpProcessMeta:
+ case INTP_PROCESS_META:
sendHeartbeat();
break;
default:
@@ -136,6 +136,10 @@ public class ClusterMonitor {
Map<String, HashMap<String, Object>> clusterMeta
= clusterManager.getClusterMeta(metaType, "");
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("clusterMeta : {}", clusterMeta);
+ }
+
for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
String key = entry.getKey();
Map<String, Object> meta = entry.getValue();
@@ -172,7 +176,7 @@ public class ClusterMonitor {
mapMonitorUtil.put(ClusterMeta.HEARTBEAT, new Date());
mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
- clusterManager.putClusterMeta(IntpProcessMeta, metaKey, mapMonitorUtil);
+ clusterManager.putClusterMeta(INTP_PROCESS_META, metaKey, mapMonitorUtil);
}
// send the usage of each service
@@ -212,7 +216,7 @@ public class ClusterMonitor {
mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
String clusterName = clusterManager.getClusterNodeName();
- clusterManager.putClusterMeta(ServerMeta, clusterName, mapMonitorUtil);
+ clusterManager.putClusterMeta(SERVER_META, clusterName, mapMonitorUtil);
}
private UsageUtil getMachineUsage() {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
index 460f6ac..dc07daa 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java
@@ -90,12 +90,12 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
logger.debug("ClusterStateMachine.backup()");
}
- // backup ServerMeta
+ // backup SERVER_META
// cluster meta map struct
// cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
Map<String, Map<String, Object>> mapServerMeta
- = clusterMeta.get(ClusterMetaType.ServerMeta, "");
- // write all ServerMeta size
+ = clusterMeta.get(ClusterMetaType.SERVER_META, "");
+ // write all SERVER_META size
writer.writeInt(mapServerMeta.size());
for (Map.Entry<String, Map<String, Object>> entry : mapServerMeta.entrySet()) {
// write cluster_name
@@ -111,11 +111,11 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
}
}
- // backup IntpProcessMeta
+ // backup INTP_PROCESS_META
// Interpreter meta map struct
// IntpGroupId -> {server_tserver_host,server_tserver_port,...}
Map<String, Map<String, Object>> mapIntpProcMeta
- = clusterMeta.get(ClusterMetaType.IntpProcessMeta, "");
+ = clusterMeta.get(ClusterMetaType.INTP_PROCESS_META, "");
// write interpreter size
writer.writeInt(mapIntpProcMeta.size());
for (Map.Entry<String, Map<String, Object>> entry : mapIntpProcMeta.entrySet()) {
@@ -140,7 +140,7 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
}
clusterMeta = new ClusterMeta();
- // read all ServerMeta size
+ // read all SERVER_META size
int nServerMeta = reader.readInt();
for (int i = 0; i < nServerMeta; i++) {
// read cluster_name
@@ -153,12 +153,12 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
String key = reader.readString();
Object value = reader.readObject();
- clusterMeta.put(ClusterMetaType.ServerMeta,
+ clusterMeta.put(ClusterMetaType.SERVER_META,
clusterName, Maps.immutableEntry(key, value));
}
}
- // read all IntpProcessMeta size
+ // read all INTP_PROCESS_META size
int nIntpMeta = reader.readInt();
for (int i = 0; i < nIntpMeta; i++) {
// read interpreter name
@@ -171,7 +171,7 @@ public class ClusterStateMachine extends AbstractPrimitiveService {
String key = reader.readString();
Object value = reader.readObject();
- clusterMeta.put(ClusterMetaType.IntpProcessMeta,
+ clusterMeta.put(ClusterMetaType.INTP_PROCESS_META,
intpName, Maps.immutableEntry(key, value));
}
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
similarity index 86%
copy from zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
copy to zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
index c6229bd..0e1120c 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEvent.java
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.cluster.meta;
+package org.apache.zeppelin.cluster.event;
/**
- * Type of cluster metadata
+ * Cluster Event
*/
-public enum ClusterMetaType {
- ServerMeta,
- IntpProcessMeta
+public enum ClusterEvent {
+ CREATE_INTP_PROCESS
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java
similarity index 67%
copy from zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
copy to zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java
index c6229bd..00c0b3d 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterEventListener.java
@@ -14,12 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.zeppelin.cluster.meta;
+package org.apache.zeppelin.cluster.event;
/**
- * Type of cluster metadata
+ * Listen for the NEW_NOTE、DEL_NOTE、REMOVE_NOTE_TO_TRASH ... event
+ * of the notebook in the NotebookServer#onMessage() function.
*/
-public enum ClusterMetaType {
- ServerMeta,
- IntpProcessMeta
+public interface ClusterEventListener {
+ public static final String CLUSTER_EVENT = "CLUSTER_EVENT";
+ public static final String CLUSTER_EVENT_MSG = "CLUSTER_EVENT_MSG";
+
+ void onClusterEvent(String msg);
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
index a26007c..e862635 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -37,8 +37,6 @@ public class ClusterMeta implements Serializable {
// zeppelin-server meta
public static String SERVER_HOST = "SERVER_HOST";
public static String SERVER_PORT = "SERVER_PORT";
- public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST";
- public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT";
public static String SERVER_START_TIME = "SERVER_START_TIME";
// interperter-process meta
@@ -73,7 +71,7 @@ public class ClusterMeta implements Serializable {
Map<String, Object> mapValue = (Map<String, Object>) value;
switch (type) {
- case ServerMeta:
+ case SERVER_META:
// Because it may be partially updated metadata information
if (mapServerMeta.containsKey(key)) {
Map<String, Object> values = mapServerMeta.get(key);
@@ -82,7 +80,7 @@ public class ClusterMeta implements Serializable {
mapServerMeta.put(key, mapValue);
}
break;
- case IntpProcessMeta:
+ case INTP_PROCESS_META:
if (mapInterpreterMeta.containsKey(key)) {
Map<String, Object> values = mapInterpreterMeta.get(key);
values.putAll(mapValue);
@@ -97,7 +95,7 @@ public class ClusterMeta implements Serializable {
Map<String, Object> values = null;
switch (type) {
- case ServerMeta:
+ case SERVER_META:
if (null == key || StringUtils.isEmpty(key)) {
return mapServerMeta;
}
@@ -107,7 +105,7 @@ public class ClusterMeta implements Serializable {
logger.warn("can not find key : {}", key);
}
break;
- case IntpProcessMeta:
+ case INTP_PROCESS_META:
if (null == key || StringUtils.isEmpty(key)) {
return mapInterpreterMeta;
}
@@ -127,14 +125,14 @@ public class ClusterMeta implements Serializable {
public Map<String, Object> remove(ClusterMetaType type, String key) {
switch (type) {
- case ServerMeta:
+ case SERVER_META:
if (mapServerMeta.containsKey(key)) {
return mapServerMeta.remove(key);
} else {
logger.warn("can not find key : {}", key);
}
break;
- case IntpProcessMeta:
+ case INTP_PROCESS_META:
if (mapInterpreterMeta.containsKey(key)) {
return mapInterpreterMeta.remove(key);
} else {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
index c6229bd..cf995f5 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaType.java
@@ -20,6 +20,6 @@ package org.apache.zeppelin.cluster.meta;
* Type of cluster metadata
*/
public enum ClusterMetaType {
- ServerMeta,
- IntpProcessMeta
+ SERVER_META,
+ INTP_PROCESS_META
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index be792c7..d4824dc 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.conf;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
@@ -28,6 +29,7 @@ import java.util.function.Predicate;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.configuration.tree.ConfigurationNode;
+import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.util.Util;
import org.slf4j.Logger;
@@ -120,6 +122,37 @@ public class ZeppelinConfiguration extends XMLConfiguration {
}
if (url == null) {
+ try {
+ Map procEnv = EnvironmentUtils.getProcEnvironment();
+ if (procEnv.containsKey("ZEPPELIN_HOME")) {
+ String zconfDir = (String) procEnv.get("ZEPPELIN_HOME");
+ File file = new File(zconfDir + File.separator
+ + "conf" + File.separator + ZEPPELIN_SITE_XML);
+ if (file.exists()) {
+ url = file.toURL();
+ }
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ if (url == null) {
+ try {
+ Map procEnv = EnvironmentUtils.getProcEnvironment();
+ if (procEnv.containsKey("ZEPPELIN_CONF_DIR")) {
+ String zconfDir = (String) procEnv.get("ZEPPELIN_CONF_DIR");
+ File file = new File(zconfDir + File.separator + ZEPPELIN_SITE_XML);
+ if (file.exists()) {
+ url = file.toURL();
+ }
+ }
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ if (url == null) {
LOG.warn("Failed to load configuration, proceeding with a default");
conf = new ZeppelinConfiguration();
} else {
@@ -144,7 +177,6 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return conf;
}
-
private String getStringValue(String name, String d) {
String value = this.properties.get(name);
if (value != null) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 1d4d231..3dc41f4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -29,6 +29,10 @@ import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.apache.zeppelin.cluster.ClusterManagerClient;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -85,6 +89,7 @@ import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -132,6 +137,10 @@ public class RemoteInterpreterServer extends Thread
private boolean isTest;
+ // cluster manager client
+ ClusterManagerClient clusterManagerClient = ClusterManagerClient.getInstance();
+ ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+
public RemoteInterpreterServer(String intpEventServerHost,
int intpEventServerPort,
String interpreterGroupId,
@@ -178,6 +187,8 @@ public class RemoteInterpreterServer extends Thread
server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport).processor(processor));
remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
+
+ clusterManagerClient.start(interpreterGroupId);
}
@Override
@@ -196,16 +207,24 @@ public class RemoteInterpreterServer extends Thread
}
}
- if (!interrupted) {
- RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
- try {
- intpEventServiceClient.registerInterpreterProcess(registerInfo);
- } catch (TException e) {
- logger.error("Error while registering interpreter: {}", registerInfo, e);
+ if (zconf.isClusterMode()) {
+ // Cluster mode, discovering interpreter processes through metadata registration
+ // TODO (Xun): Unified use of cluster metadata for process discovery of all operating modes
+ // 1. Can optimize the startup logic of the process
+ // 2. Can solve the problem that running the interpreter's IP in docker may be a virtual IP
+ putClusterMeta();
+ } else {
+ if (!interrupted) {
+ RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
try {
- shutdown();
- } catch (TException e1) {
- logger.warn("Exception occurs while shutting down", e1);
+ intpEventServiceClient.registerInterpreterProcess(registerInfo);
+ } catch (TException e) {
+ logger.error("Error while registering interpreter: {}", registerInfo, e);
+ try {
+ shutdown();
+ } catch (TException e1) {
+ logger.warn("Exception occurs while shutting down", e1);
+ }
}
}
}
@@ -303,6 +322,26 @@ public class RemoteInterpreterServer extends Thread
System.exit(0);
}
+ // Submit interpreter process metadata information to cluster metadata
+ private void putClusterMeta() {
+ if (!zconf.isClusterMode()){
+ return;
+ }
+ String nodeName = clusterManagerClient.getClusterNodeName();
+
+ // commit interpreter meta
+ HashMap<String, Object> meta = new HashMap<>();
+ meta.put(ClusterMeta.NODE_NAME, nodeName);
+ meta.put(ClusterMeta.INTP_PROCESS_ID, interpreterGroupId);
+ meta.put(ClusterMeta.INTP_TSERVER_HOST, host);
+ meta.put(ClusterMeta.INTP_TSERVER_PORT, port);
+ meta.put(ClusterMeta.INTP_START_TIME, new Date());
+ meta.put(ClusterMeta.HEARTBEAT, new Date());
+ meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+ clusterManagerClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, interpreterGroupId, meta);
+ }
+
@Override
public void createInterpreter(String interpreterGroupId, String sessionId, String
className, Map<String, String> properties, String userName) throws TException {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java
deleted file mode 100644
index 4cd82f0..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java
+++ /dev/null
@@ -1,914 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.12.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.zeppelin.interpreter.thrift;
-
-@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-06-10")
-public class ClusterIntpProcParameters implements org.apache.thrift.TBase<ClusterIntpProcParameters, ClusterIntpProcParameters._Fields>, java.io.Serializable, Cloneable, Comparable<ClusterIntpProcParameters> {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterIntpProcParameters");
-
- private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
- private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
- private static final org.apache.thrift.protocol.TField USER_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("userName", org.apache.thrift.protocol.TType.STRING, (short)3);
- private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)4);
- private static final org.apache.thrift.protocol.TField REPL_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("replName", org.apache.thrift.protocol.TType.STRING, (short)5);
- private static final org.apache.thrift.protocol.TField DEFAULT_INTERPRETER_SETTING_FIELD_DESC = new org.apache.thrift.protocol.TField("defaultInterpreterSetting", org.apache.thrift.protocol.TType.STRING, (short)6);
-
- private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ClusterIntpProcParametersStandardSchemeFactory();
- private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ClusterIntpProcParametersTupleSchemeFactory();
-
- public @org.apache.thrift.annotation.Nullable java.lang.String host; // required
- public int port; // required
- public @org.apache.thrift.annotation.Nullable java.lang.String userName; // required
- public @org.apache.thrift.annotation.Nullable java.lang.String noteId; // required
- public @org.apache.thrift.annotation.Nullable java.lang.String replName; // required
- public @org.apache.thrift.annotation.Nullable java.lang.String defaultInterpreterSetting; // required
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- HOST((short)1, "host"),
- PORT((short)2, "port"),
- USER_NAME((short)3, "userName"),
- NOTE_ID((short)4, "noteId"),
- REPL_NAME((short)5, "replName"),
- DEFAULT_INTERPRETER_SETTING((short)6, "defaultInterpreterSetting");
-
- private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
-
- static {
- for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- @org.apache.thrift.annotation.Nullable
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- case 1: // HOST
- return HOST;
- case 2: // PORT
- return PORT;
- case 3: // USER_NAME
- return USER_NAME;
- case 4: // NOTE_ID
- return NOTE_ID;
- case 5: // REPL_NAME
- return REPL_NAME;
- case 6: // DEFAULT_INTERPRETER_SETTING
- return DEFAULT_INTERPRETER_SETTING;
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- @org.apache.thrift.annotation.Nullable
- public static _Fields findByName(java.lang.String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final java.lang.String _fieldName;
-
- _Fields(short thriftId, java.lang.String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public java.lang.String getFieldName() {
- return _fieldName;
- }
- }
-
- // isset id assignments
- private static final int __PORT_ISSET_ID = 0;
- private byte __isset_bitfield = 0;
- public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
- static {
- java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
- tmpMap.put(_Fields.USER_NAME, new org.apache.thrift.meta_data.FieldMetaData("userName", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.REPL_NAME, new org.apache.thrift.meta_data.FieldMetaData("replName", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.DEFAULT_INTERPRETER_SETTING, new org.apache.thrift.meta_data.FieldMetaData("defaultInterpreterSetting", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterIntpProcParameters.class, metaDataMap);
- }
-
- public ClusterIntpProcParameters() {
- }
-
- public ClusterIntpProcParameters(
- java.lang.String host,
- int port,
- java.lang.String userName,
- java.lang.String noteId,
- java.lang.String replName,
- java.lang.String defaultInterpreterSetting)
- {
- this();
- this.host = host;
- this.port = port;
- setPortIsSet(true);
- this.userName = userName;
- this.noteId = noteId;
- this.replName = replName;
- this.defaultInterpreterSetting = defaultInterpreterSetting;
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public ClusterIntpProcParameters(ClusterIntpProcParameters other) {
- __isset_bitfield = other.__isset_bitfield;
- if (other.isSetHost()) {
- this.host = other.host;
- }
- this.port = other.port;
- if (other.isSetUserName()) {
- this.userName = other.userName;
- }
- if (other.isSetNoteId()) {
- this.noteId = other.noteId;
- }
- if (other.isSetReplName()) {
- this.replName = other.replName;
- }
- if (other.isSetDefaultInterpreterSetting()) {
- this.defaultInterpreterSetting = other.defaultInterpreterSetting;
- }
- }
-
- public ClusterIntpProcParameters deepCopy() {
- return new ClusterIntpProcParameters(this);
- }
-
- @Override
- public void clear() {
- this.host = null;
- setPortIsSet(false);
- this.port = 0;
- this.userName = null;
- this.noteId = null;
- this.replName = null;
- this.defaultInterpreterSetting = null;
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.String getHost() {
- return this.host;
- }
-
- public ClusterIntpProcParameters setHost(@org.apache.thrift.annotation.Nullable java.lang.String host) {
- this.host = host;
- return this;
- }
-
- public void unsetHost() {
- this.host = null;
- }
-
- /** Returns true if field host is set (has been assigned a value) and false otherwise */
- public boolean isSetHost() {
- return this.host != null;
- }
-
- public void setHostIsSet(boolean value) {
- if (!value) {
- this.host = null;
- }
- }
-
- public int getPort() {
- return this.port;
- }
-
- public ClusterIntpProcParameters setPort(int port) {
- this.port = port;
- setPortIsSet(true);
- return this;
- }
-
- public void unsetPort() {
- __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __PORT_ISSET_ID);
- }
-
- /** Returns true if field port is set (has been assigned a value) and false otherwise */
- public boolean isSetPort() {
- return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
- }
-
- public void setPortIsSet(boolean value) {
- __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, value);
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.String getUserName() {
- return this.userName;
- }
-
- public ClusterIntpProcParameters setUserName(@org.apache.thrift.annotation.Nullable java.lang.String userName) {
- this.userName = userName;
- return this;
- }
-
- public void unsetUserName() {
- this.userName = null;
- }
-
- /** Returns true if field userName is set (has been assigned a value) and false otherwise */
- public boolean isSetUserName() {
- return this.userName != null;
- }
-
- public void setUserNameIsSet(boolean value) {
- if (!value) {
- this.userName = null;
- }
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.String getNoteId() {
- return this.noteId;
- }
-
- public ClusterIntpProcParameters setNoteId(@org.apache.thrift.annotation.Nullable java.lang.String noteId) {
- this.noteId = noteId;
- return this;
- }
-
- public void unsetNoteId() {
- this.noteId = null;
- }
-
- /** Returns true if field noteId is set (has been assigned a value) and false otherwise */
- public boolean isSetNoteId() {
- return this.noteId != null;
- }
-
- public void setNoteIdIsSet(boolean value) {
- if (!value) {
- this.noteId = null;
- }
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.String getReplName() {
- return this.replName;
- }
-
- public ClusterIntpProcParameters setReplName(@org.apache.thrift.annotation.Nullable java.lang.String replName) {
- this.replName = replName;
- return this;
- }
-
- public void unsetReplName() {
- this.replName = null;
- }
-
- /** Returns true if field replName is set (has been assigned a value) and false otherwise */
- public boolean isSetReplName() {
- return this.replName != null;
- }
-
- public void setReplNameIsSet(boolean value) {
- if (!value) {
- this.replName = null;
- }
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.String getDefaultInterpreterSetting() {
- return this.defaultInterpreterSetting;
- }
-
- public ClusterIntpProcParameters setDefaultInterpreterSetting(@org.apache.thrift.annotation.Nullable java.lang.String defaultInterpreterSetting) {
- this.defaultInterpreterSetting = defaultInterpreterSetting;
- return this;
- }
-
- public void unsetDefaultInterpreterSetting() {
- this.defaultInterpreterSetting = null;
- }
-
- /** Returns true if field defaultInterpreterSetting is set (has been assigned a value) and false otherwise */
- public boolean isSetDefaultInterpreterSetting() {
- return this.defaultInterpreterSetting != null;
- }
-
- public void setDefaultInterpreterSettingIsSet(boolean value) {
- if (!value) {
- this.defaultInterpreterSetting = null;
- }
- }
-
- public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
- switch (field) {
- case HOST:
- if (value == null) {
- unsetHost();
- } else {
- setHost((java.lang.String)value);
- }
- break;
-
- case PORT:
- if (value == null) {
- unsetPort();
- } else {
- setPort((java.lang.Integer)value);
- }
- break;
-
- case USER_NAME:
- if (value == null) {
- unsetUserName();
- } else {
- setUserName((java.lang.String)value);
- }
- break;
-
- case NOTE_ID:
- if (value == null) {
- unsetNoteId();
- } else {
- setNoteId((java.lang.String)value);
- }
- break;
-
- case REPL_NAME:
- if (value == null) {
- unsetReplName();
- } else {
- setReplName((java.lang.String)value);
- }
- break;
-
- case DEFAULT_INTERPRETER_SETTING:
- if (value == null) {
- unsetDefaultInterpreterSetting();
- } else {
- setDefaultInterpreterSetting((java.lang.String)value);
- }
- break;
-
- }
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.Object getFieldValue(_Fields field) {
- switch (field) {
- case HOST:
- return getHost();
-
- case PORT:
- return getPort();
-
- case USER_NAME:
- return getUserName();
-
- case NOTE_ID:
- return getNoteId();
-
- case REPL_NAME:
- return getReplName();
-
- case DEFAULT_INTERPRETER_SETTING:
- return getDefaultInterpreterSetting();
-
- }
- throw new java.lang.IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new java.lang.IllegalArgumentException();
- }
-
- switch (field) {
- case HOST:
- return isSetHost();
- case PORT:
- return isSetPort();
- case USER_NAME:
- return isSetUserName();
- case NOTE_ID:
- return isSetNoteId();
- case REPL_NAME:
- return isSetReplName();
- case DEFAULT_INTERPRETER_SETTING:
- return isSetDefaultInterpreterSetting();
- }
- throw new java.lang.IllegalStateException();
- }
-
- @Override
- public boolean equals(java.lang.Object that) {
- if (that == null)
- return false;
- if (that instanceof ClusterIntpProcParameters)
- return this.equals((ClusterIntpProcParameters)that);
- return false;
- }
-
- public boolean equals(ClusterIntpProcParameters that) {
- if (that == null)
- return false;
- if (this == that)
- return true;
-
- boolean this_present_host = true && this.isSetHost();
- boolean that_present_host = true && that.isSetHost();
- if (this_present_host || that_present_host) {
- if (!(this_present_host && that_present_host))
- return false;
- if (!this.host.equals(that.host))
- return false;
- }
-
- boolean this_present_port = true;
- boolean that_present_port = true;
- if (this_present_port || that_present_port) {
- if (!(this_present_port && that_present_port))
- return false;
- if (this.port != that.port)
- return false;
- }
-
- boolean this_present_userName = true && this.isSetUserName();
- boolean that_present_userName = true && that.isSetUserName();
- if (this_present_userName || that_present_userName) {
- if (!(this_present_userName && that_present_userName))
- return false;
- if (!this.userName.equals(that.userName))
- return false;
- }
-
- boolean this_present_noteId = true && this.isSetNoteId();
- boolean that_present_noteId = true && that.isSetNoteId();
- if (this_present_noteId || that_present_noteId) {
- if (!(this_present_noteId && that_present_noteId))
- return false;
- if (!this.noteId.equals(that.noteId))
- return false;
- }
-
- boolean this_present_replName = true && this.isSetReplName();
- boolean that_present_replName = true && that.isSetReplName();
- if (this_present_replName || that_present_replName) {
- if (!(this_present_replName && that_present_replName))
- return false;
- if (!this.replName.equals(that.replName))
- return false;
- }
-
- boolean this_present_defaultInterpreterSetting = true && this.isSetDefaultInterpreterSetting();
- boolean that_present_defaultInterpreterSetting = true && that.isSetDefaultInterpreterSetting();
- if (this_present_defaultInterpreterSetting || that_present_defaultInterpreterSetting) {
- if (!(this_present_defaultInterpreterSetting && that_present_defaultInterpreterSetting))
- return false;
- if (!this.defaultInterpreterSetting.equals(that.defaultInterpreterSetting))
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int hashCode = 1;
-
- hashCode = hashCode * 8191 + ((isSetHost()) ? 131071 : 524287);
- if (isSetHost())
- hashCode = hashCode * 8191 + host.hashCode();
-
- hashCode = hashCode * 8191 + port;
-
- hashCode = hashCode * 8191 + ((isSetUserName()) ? 131071 : 524287);
- if (isSetUserName())
- hashCode = hashCode * 8191 + userName.hashCode();
-
- hashCode = hashCode * 8191 + ((isSetNoteId()) ? 131071 : 524287);
- if (isSetNoteId())
- hashCode = hashCode * 8191 + noteId.hashCode();
-
- hashCode = hashCode * 8191 + ((isSetReplName()) ? 131071 : 524287);
- if (isSetReplName())
- hashCode = hashCode * 8191 + replName.hashCode();
-
- hashCode = hashCode * 8191 + ((isSetDefaultInterpreterSetting()) ? 131071 : 524287);
- if (isSetDefaultInterpreterSetting())
- hashCode = hashCode * 8191 + defaultInterpreterSetting.hashCode();
-
- return hashCode;
- }
-
- @Override
- public int compareTo(ClusterIntpProcParameters other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
-
- lastComparison = java.lang.Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetHost()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, other.host);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = java.lang.Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetPort()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, other.port);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = java.lang.Boolean.valueOf(isSetUserName()).compareTo(other.isSetUserName());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetUserName()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.userName, other.userName);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = java.lang.Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetNoteId()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = java.lang.Boolean.valueOf(isSetReplName()).compareTo(other.isSetReplName());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetReplName()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replName, other.replName);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = java.lang.Boolean.valueOf(isSetDefaultInterpreterSetting()).compareTo(other.isSetDefaultInterpreterSetting());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetDefaultInterpreterSetting()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.defaultInterpreterSetting, other.defaultInterpreterSetting);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- @org.apache.thrift.annotation.Nullable
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- scheme(iprot).read(iprot, this);
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- scheme(oprot).write(oprot, this);
- }
-
- @Override
- public java.lang.String toString() {
- java.lang.StringBuilder sb = new java.lang.StringBuilder("ClusterIntpProcParameters(");
- boolean first = true;
-
- sb.append("host:");
- if (this.host == null) {
- sb.append("null");
- } else {
- sb.append(this.host);
- }
- first = false;
- if (!first) sb.append(", ");
- sb.append("port:");
- sb.append(this.port);
- first = false;
- if (!first) sb.append(", ");
- sb.append("userName:");
- if (this.userName == null) {
- sb.append("null");
- } else {
- sb.append(this.userName);
- }
- first = false;
- if (!first) sb.append(", ");
- sb.append("noteId:");
- if (this.noteId == null) {
- sb.append("null");
- } else {
- sb.append(this.noteId);
- }
- first = false;
- if (!first) sb.append(", ");
- sb.append("replName:");
- if (this.replName == null) {
- sb.append("null");
- } else {
- sb.append(this.replName);
- }
- first = false;
- if (!first) sb.append(", ");
- sb.append("defaultInterpreterSetting:");
- if (this.defaultInterpreterSetting == null) {
- sb.append("null");
- } else {
- sb.append(this.defaultInterpreterSetting);
- }
- first = false;
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift.TException {
- // check for required fields
- // check for sub-struct validity
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
- try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bitfield = 0;
- read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private static class ClusterIntpProcParametersStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
- public ClusterIntpProcParametersStandardScheme getScheme() {
- return new ClusterIntpProcParametersStandardScheme();
- }
- }
-
- private static class ClusterIntpProcParametersStandardScheme extends org.apache.thrift.scheme.StandardScheme<ClusterIntpProcParameters> {
-
- public void read(org.apache.thrift.protocol.TProtocol iprot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField schemeField;
- iprot.readStructBegin();
- while (true)
- {
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // HOST
- if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.host = iprot.readString();
- struct.setHostIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // PORT
- if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.port = iprot.readI32();
- struct.setPortIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 3: // USER_NAME
- if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.userName = iprot.readString();
- struct.setUserNameIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 4: // NOTE_ID
- if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.noteId = iprot.readString();
- struct.setNoteIdIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 5: // REPL_NAME
- if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.replName = iprot.readString();
- struct.setReplNameIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 6: // DEFAULT_INTERPRETER_SETTING
- if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.defaultInterpreterSetting = iprot.readString();
- struct.setDefaultInterpreterSettingIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- struct.validate();
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
- struct.validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- if (struct.host != null) {
- oprot.writeFieldBegin(HOST_FIELD_DESC);
- oprot.writeString(struct.host);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldBegin(PORT_FIELD_DESC);
- oprot.writeI32(struct.port);
- oprot.writeFieldEnd();
- if (struct.userName != null) {
- oprot.writeFieldBegin(USER_NAME_FIELD_DESC);
- oprot.writeString(struct.userName);
- oprot.writeFieldEnd();
- }
- if (struct.noteId != null) {
- oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
- oprot.writeString(struct.noteId);
- oprot.writeFieldEnd();
- }
- if (struct.replName != null) {
- oprot.writeFieldBegin(REPL_NAME_FIELD_DESC);
- oprot.writeString(struct.replName);
- oprot.writeFieldEnd();
- }
- if (struct.defaultInterpreterSetting != null) {
- oprot.writeFieldBegin(DEFAULT_INTERPRETER_SETTING_FIELD_DESC);
- oprot.writeString(struct.defaultInterpreterSetting);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- }
-
- private static class ClusterIntpProcParametersTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
- public ClusterIntpProcParametersTupleScheme getScheme() {
- return new ClusterIntpProcParametersTupleScheme();
- }
- }
-
- private static class ClusterIntpProcParametersTupleScheme extends org.apache.thrift.scheme.TupleScheme<ClusterIntpProcParameters> {
-
- @Override
- public void write(org.apache.thrift.protocol.TProtocol prot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet optionals = new java.util.BitSet();
- if (struct.isSetHost()) {
- optionals.set(0);
- }
- if (struct.isSetPort()) {
- optionals.set(1);
- }
- if (struct.isSetUserName()) {
- optionals.set(2);
- }
- if (struct.isSetNoteId()) {
- optionals.set(3);
- }
- if (struct.isSetReplName()) {
- optionals.set(4);
- }
- if (struct.isSetDefaultInterpreterSetting()) {
- optionals.set(5);
- }
- oprot.writeBitSet(optionals, 6);
- if (struct.isSetHost()) {
- oprot.writeString(struct.host);
- }
- if (struct.isSetPort()) {
- oprot.writeI32(struct.port);
- }
- if (struct.isSetUserName()) {
- oprot.writeString(struct.userName);
- }
- if (struct.isSetNoteId()) {
- oprot.writeString(struct.noteId);
- }
- if (struct.isSetReplName()) {
- oprot.writeString(struct.replName);
- }
- if (struct.isSetDefaultInterpreterSetting()) {
- oprot.writeString(struct.defaultInterpreterSetting);
- }
- }
-
- @Override
- public void read(org.apache.thrift.protocol.TProtocol prot, ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(6);
- if (incoming.get(0)) {
- struct.host = iprot.readString();
- struct.setHostIsSet(true);
- }
- if (incoming.get(1)) {
- struct.port = iprot.readI32();
- struct.setPortIsSet(true);
- }
- if (incoming.get(2)) {
- struct.userName = iprot.readString();
- struct.setUserNameIsSet(true);
- }
- if (incoming.get(3)) {
- struct.noteId = iprot.readString();
- struct.setNoteIdIsSet(true);
- }
- if (incoming.get(4)) {
- struct.replName = iprot.readString();
- struct.setReplNameIsSet(true);
- }
- if (incoming.get(5)) {
- struct.defaultInterpreterSetting = iprot.readString();
- struct.setDefaultInterpreterSettingIsSet(true);
- }
- }
- }
-
- private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
- return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
- }
-}
-
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java
deleted file mode 100644
index 5c46b0e..0000000
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java
+++ /dev/null
@@ -1,995 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.12.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.zeppelin.interpreter.thrift;
-
-@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-06-10")
-public class ClusterManagerService {
-
- public interface Iface {
-
- public boolean createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException;
-
- }
-
- public interface AsyncIface {
-
- public void createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException;
-
- }
-
- public static class Client extends org.apache.thrift.TServiceClient implements Iface {
- public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
- public Factory() {}
- public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
- return new Client(prot);
- }
- public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
- return new Client(iprot, oprot);
- }
- }
-
- public Client(org.apache.thrift.protocol.TProtocol prot)
- {
- super(prot, prot);
- }
-
- public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
- super(iprot, oprot);
- }
-
- public boolean createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException
- {
- send_createClusterInterpreterProcess(intpProcParameters);
- return recv_createClusterInterpreterProcess();
- }
-
- public void send_createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters) throws org.apache.thrift.TException
- {
- createClusterInterpreterProcess_args args = new createClusterInterpreterProcess_args();
- args.setIntpProcParameters(intpProcParameters);
- sendBase("createClusterInterpreterProcess", args);
- }
-
- public boolean recv_createClusterInterpreterProcess() throws org.apache.thrift.TException
- {
- createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
- receiveBase(result, "createClusterInterpreterProcess");
- if (result.isSetSuccess()) {
- return result.success;
- }
- throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "createClusterInterpreterProcess failed: unknown result");
- }
-
- }
- public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
- public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
- private org.apache.thrift.async.TAsyncClientManager clientManager;
- private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
- public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
- this.clientManager = clientManager;
- this.protocolFactory = protocolFactory;
- }
- public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
- return new AsyncClient(protocolFactory, clientManager, transport);
- }
- }
-
- public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
- super(protocolFactory, clientManager, transport);
- }
-
- public void createClusterInterpreterProcess(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException {
- checkReady();
- createClusterInterpreterProcess_call method_call = new createClusterInterpreterProcess_call(intpProcParameters, resultHandler, this, ___protocolFactory, ___transport);
- this.___currentMethod = method_call;
- ___manager.call(method_call);
- }
-
- public static class createClusterInterpreterProcess_call extends org.apache.thrift.async.TAsyncMethodCall<java.lang.Boolean> {
- private ClusterIntpProcParameters intpProcParameters;
- public createClusterInterpreterProcess_call(ClusterIntpProcParameters intpProcParameters, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
- super(client, protocolFactory, transport, resultHandler, false);
- this.intpProcParameters = intpProcParameters;
- }
-
- public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
- prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("createClusterInterpreterProcess", org.apache.thrift.protocol.TMessageType.CALL, 0));
- createClusterInterpreterProcess_args args = new createClusterInterpreterProcess_args();
- args.setIntpProcParameters(intpProcParameters);
- args.write(prot);
- prot.writeMessageEnd();
- }
-
- public java.lang.Boolean getResult() throws org.apache.thrift.TException {
- if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
- throw new java.lang.IllegalStateException("Method call not finished!");
- }
- org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
- org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
- return (new Client(prot)).recv_createClusterInterpreterProcess();
- }
- }
-
- }
-
- public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
- private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
- public Processor(I iface) {
- super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
- }
-
- protected Processor(I iface, java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- super(iface, getProcessMap(processMap));
- }
-
- private static <I extends Iface> java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
- processMap.put("createClusterInterpreterProcess", new createClusterInterpreterProcess());
- return processMap;
- }
-
- public static class createClusterInterpreterProcess<I extends Iface> extends org.apache.thrift.ProcessFunction<I, createClusterInterpreterProcess_args> {
- public createClusterInterpreterProcess() {
- super("createClusterInterpreterProcess");
- }
-
- public createClusterInterpreterProcess_args getEmptyArgsInstance() {
- return new createClusterInterpreterProcess_args();
- }
-
- protected boolean isOneway() {
- return false;
- }
-
- @Override
- protected boolean rethrowUnhandledExceptions() {
- return false;
- }
-
- public createClusterInterpreterProcess_result getResult(I iface, createClusterInterpreterProcess_args args) throws org.apache.thrift.TException {
- createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
- result.success = iface.createClusterInterpreterProcess(args.intpProcParameters);
- result.setSuccessIsSet(true);
- return result;
- }
- }
-
- }
-
- public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
- private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName());
- public AsyncProcessor(I iface) {
- super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
- }
-
- protected AsyncProcessor(I iface, java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
- super(iface, getProcessMap(processMap));
- }
-
- private static <I extends AsyncIface> java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
- processMap.put("createClusterInterpreterProcess", new createClusterInterpreterProcess());
- return processMap;
- }
-
- public static class createClusterInterpreterProcess<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, createClusterInterpreterProcess_args, java.lang.Boolean> {
- public createClusterInterpreterProcess() {
- super("createClusterInterpreterProcess");
- }
-
- public createClusterInterpreterProcess_args getEmptyArgsInstance() {
- return new createClusterInterpreterProcess_args();
- }
-
- public org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) {
- final org.apache.thrift.AsyncProcessFunction fcall = this;
- return new org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean>() {
- public void onComplete(java.lang.Boolean o) {
- createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
- result.success = o;
- result.setSuccessIsSet(true);
- try {
- fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
- } catch (org.apache.thrift.transport.TTransportException e) {
- _LOGGER.error("TTransportException writing to internal frame buffer", e);
- fb.close();
- } catch (java.lang.Exception e) {
- _LOGGER.error("Exception writing to internal frame buffer", e);
- onError(e);
- }
- }
- public void onError(java.lang.Exception e) {
- byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
- org.apache.thrift.TSerializable msg;
- createClusterInterpreterProcess_result result = new createClusterInterpreterProcess_result();
- if (e instanceof org.apache.thrift.transport.TTransportException) {
- _LOGGER.error("TTransportException inside handler", e);
- fb.close();
- return;
- } else if (e instanceof org.apache.thrift.TApplicationException) {
- _LOGGER.error("TApplicationException inside handler", e);
- msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
- msg = (org.apache.thrift.TApplicationException)e;
- } else {
- _LOGGER.error("Exception inside handler", e);
- msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
- msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
- }
- try {
- fcall.sendResponse(fb,msg,msgType,seqid);
- } catch (java.lang.Exception ex) {
- _LOGGER.error("Exception writing to internal frame buffer", ex);
- fb.close();
- }
- }
- };
- }
-
- protected boolean isOneway() {
- return false;
- }
-
- public void start(I iface, createClusterInterpreterProcess_args args, org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean> resultHandler) throws org.apache.thrift.TException {
- iface.createClusterInterpreterProcess(args.intpProcParameters,resultHandler);
- }
- }
-
- }
-
- public static class createClusterInterpreterProcess_args implements org.apache.thrift.TBase<createClusterInterpreterProcess_args, createClusterInterpreterProcess_args._Fields>, java.io.Serializable, Cloneable, Comparable<createClusterInterpreterProcess_args> {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_args");
-
- private static final org.apache.thrift.protocol.TField INTP_PROC_PARAMETERS_FIELD_DESC = new org.apache.thrift.protocol.TField("intpProcParameters", org.apache.thrift.protocol.TType.STRUCT, (short)1);
-
- private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new createClusterInterpreterProcess_argsStandardSchemeFactory();
- private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new createClusterInterpreterProcess_argsTupleSchemeFactory();
-
- public @org.apache.thrift.annotation.Nullable ClusterIntpProcParameters intpProcParameters; // required
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- INTP_PROC_PARAMETERS((short)1, "intpProcParameters");
-
- private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
-
- static {
- for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- @org.apache.thrift.annotation.Nullable
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- case 1: // INTP_PROC_PARAMETERS
- return INTP_PROC_PARAMETERS;
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- @org.apache.thrift.annotation.Nullable
- public static _Fields findByName(java.lang.String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final java.lang.String _fieldName;
-
- _Fields(short thriftId, java.lang.String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public java.lang.String getFieldName() {
- return _fieldName;
- }
- }
-
- // isset id assignments
- public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
- static {
- java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.INTP_PROC_PARAMETERS, new org.apache.thrift.meta_data.FieldMetaData("intpProcParameters", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ClusterIntpProcParameters.class)));
- metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_args.class, metaDataMap);
- }
-
- public createClusterInterpreterProcess_args() {
- }
-
- public createClusterInterpreterProcess_args(
- ClusterIntpProcParameters intpProcParameters)
- {
- this();
- this.intpProcParameters = intpProcParameters;
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public createClusterInterpreterProcess_args(createClusterInterpreterProcess_args other) {
- if (other.isSetIntpProcParameters()) {
- this.intpProcParameters = new ClusterIntpProcParameters(other.intpProcParameters);
- }
- }
-
- public createClusterInterpreterProcess_args deepCopy() {
- return new createClusterInterpreterProcess_args(this);
- }
-
- @Override
- public void clear() {
- this.intpProcParameters = null;
- }
-
- @org.apache.thrift.annotation.Nullable
- public ClusterIntpProcParameters getIntpProcParameters() {
- return this.intpProcParameters;
- }
-
- public createClusterInterpreterProcess_args setIntpProcParameters(@org.apache.thrift.annotation.Nullable ClusterIntpProcParameters intpProcParameters) {
- this.intpProcParameters = intpProcParameters;
- return this;
- }
-
- public void unsetIntpProcParameters() {
- this.intpProcParameters = null;
- }
-
- /** Returns true if field intpProcParameters is set (has been assigned a value) and false otherwise */
- public boolean isSetIntpProcParameters() {
- return this.intpProcParameters != null;
- }
-
- public void setIntpProcParametersIsSet(boolean value) {
- if (!value) {
- this.intpProcParameters = null;
- }
- }
-
- public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
- switch (field) {
- case INTP_PROC_PARAMETERS:
- if (value == null) {
- unsetIntpProcParameters();
- } else {
- setIntpProcParameters((ClusterIntpProcParameters)value);
- }
- break;
-
- }
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.Object getFieldValue(_Fields field) {
- switch (field) {
- case INTP_PROC_PARAMETERS:
- return getIntpProcParameters();
-
- }
- throw new java.lang.IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new java.lang.IllegalArgumentException();
- }
-
- switch (field) {
- case INTP_PROC_PARAMETERS:
- return isSetIntpProcParameters();
- }
- throw new java.lang.IllegalStateException();
- }
-
- @Override
- public boolean equals(java.lang.Object that) {
- if (that == null)
- return false;
- if (that instanceof createClusterInterpreterProcess_args)
- return this.equals((createClusterInterpreterProcess_args)that);
- return false;
- }
-
- public boolean equals(createClusterInterpreterProcess_args that) {
- if (that == null)
- return false;
- if (this == that)
- return true;
-
- boolean this_present_intpProcParameters = true && this.isSetIntpProcParameters();
- boolean that_present_intpProcParameters = true && that.isSetIntpProcParameters();
- if (this_present_intpProcParameters || that_present_intpProcParameters) {
- if (!(this_present_intpProcParameters && that_present_intpProcParameters))
- return false;
- if (!this.intpProcParameters.equals(that.intpProcParameters))
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int hashCode = 1;
-
- hashCode = hashCode * 8191 + ((isSetIntpProcParameters()) ? 131071 : 524287);
- if (isSetIntpProcParameters())
- hashCode = hashCode * 8191 + intpProcParameters.hashCode();
-
- return hashCode;
- }
-
- @Override
- public int compareTo(createClusterInterpreterProcess_args other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
-
- lastComparison = java.lang.Boolean.valueOf(isSetIntpProcParameters()).compareTo(other.isSetIntpProcParameters());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetIntpProcParameters()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.intpProcParameters, other.intpProcParameters);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- @org.apache.thrift.annotation.Nullable
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- scheme(iprot).read(iprot, this);
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- scheme(oprot).write(oprot, this);
- }
-
- @Override
- public java.lang.String toString() {
- java.lang.StringBuilder sb = new java.lang.StringBuilder("createClusterInterpreterProcess_args(");
- boolean first = true;
-
- sb.append("intpProcParameters:");
- if (this.intpProcParameters == null) {
- sb.append("null");
- } else {
- sb.append(this.intpProcParameters);
- }
- first = false;
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift.TException {
- // check for required fields
- // check for sub-struct validity
- if (intpProcParameters != null) {
- intpProcParameters.validate();
- }
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
- try {
- read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private static class createClusterInterpreterProcess_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
- public createClusterInterpreterProcess_argsStandardScheme getScheme() {
- return new createClusterInterpreterProcess_argsStandardScheme();
- }
- }
-
- private static class createClusterInterpreterProcess_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<createClusterInterpreterProcess_args> {
-
- public void read(org.apache.thrift.protocol.TProtocol iprot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField schemeField;
- iprot.readStructBegin();
- while (true)
- {
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // INTP_PROC_PARAMETERS
- if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
- struct.intpProcParameters = new ClusterIntpProcParameters();
- struct.intpProcParameters.read(iprot);
- struct.setIntpProcParametersIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- struct.validate();
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
- struct.validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- if (struct.intpProcParameters != null) {
- oprot.writeFieldBegin(INTP_PROC_PARAMETERS_FIELD_DESC);
- struct.intpProcParameters.write(oprot);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- }
-
- private static class createClusterInterpreterProcess_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
- public createClusterInterpreterProcess_argsTupleScheme getScheme() {
- return new createClusterInterpreterProcess_argsTupleScheme();
- }
- }
-
- private static class createClusterInterpreterProcess_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<createClusterInterpreterProcess_args> {
-
- @Override
- public void write(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet optionals = new java.util.BitSet();
- if (struct.isSetIntpProcParameters()) {
- optionals.set(0);
- }
- oprot.writeBitSet(optionals, 1);
- if (struct.isSetIntpProcParameters()) {
- struct.intpProcParameters.write(oprot);
- }
- }
-
- @Override
- public void read(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_args struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(1);
- if (incoming.get(0)) {
- struct.intpProcParameters = new ClusterIntpProcParameters();
- struct.intpProcParameters.read(iprot);
- struct.setIntpProcParametersIsSet(true);
- }
- }
- }
-
- private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
- return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
- }
- }
-
- public static class createClusterInterpreterProcess_result implements org.apache.thrift.TBase<createClusterInterpreterProcess_result, createClusterInterpreterProcess_result._Fields>, java.io.Serializable, Cloneable, Comparable<createClusterInterpreterProcess_result> {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_result");
-
- private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)0);
-
- private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new createClusterInterpreterProcess_resultStandardSchemeFactory();
- private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new createClusterInterpreterProcess_resultTupleSchemeFactory();
-
- public boolean success; // required
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- SUCCESS((short)0, "success");
-
- private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
-
- static {
- for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- @org.apache.thrift.annotation.Nullable
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- case 0: // SUCCESS
- return SUCCESS;
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- @org.apache.thrift.annotation.Nullable
- public static _Fields findByName(java.lang.String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final java.lang.String _fieldName;
-
- _Fields(short thriftId, java.lang.String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public java.lang.String getFieldName() {
- return _fieldName;
- }
- }
-
- // isset id assignments
- private static final int __SUCCESS_ISSET_ID = 0;
- private byte __isset_bitfield = 0;
- public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
- static {
- java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
- metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_result.class, metaDataMap);
- }
-
- public createClusterInterpreterProcess_result() {
- }
-
- public createClusterInterpreterProcess_result(
- boolean success)
- {
- this();
- this.success = success;
- setSuccessIsSet(true);
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public createClusterInterpreterProcess_result(createClusterInterpreterProcess_result other) {
- __isset_bitfield = other.__isset_bitfield;
- this.success = other.success;
- }
-
- public createClusterInterpreterProcess_result deepCopy() {
- return new createClusterInterpreterProcess_result(this);
- }
-
- @Override
- public void clear() {
- setSuccessIsSet(false);
- this.success = false;
- }
-
- public boolean isSuccess() {
- return this.success;
- }
-
- public createClusterInterpreterProcess_result setSuccess(boolean success) {
- this.success = success;
- setSuccessIsSet(true);
- return this;
- }
-
- public void unsetSuccess() {
- __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID);
- }
-
- /** Returns true if field success is set (has been assigned a value) and false otherwise */
- public boolean isSetSuccess() {
- return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID);
- }
-
- public void setSuccessIsSet(boolean value) {
- __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value);
- }
-
- public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
- switch (field) {
- case SUCCESS:
- if (value == null) {
- unsetSuccess();
- } else {
- setSuccess((java.lang.Boolean)value);
- }
- break;
-
- }
- }
-
- @org.apache.thrift.annotation.Nullable
- public java.lang.Object getFieldValue(_Fields field) {
- switch (field) {
- case SUCCESS:
- return isSuccess();
-
- }
- throw new java.lang.IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new java.lang.IllegalArgumentException();
- }
-
- switch (field) {
- case SUCCESS:
- return isSetSuccess();
- }
- throw new java.lang.IllegalStateException();
- }
-
- @Override
- public boolean equals(java.lang.Object that) {
- if (that == null)
- return false;
- if (that instanceof createClusterInterpreterProcess_result)
- return this.equals((createClusterInterpreterProcess_result)that);
- return false;
- }
-
- public boolean equals(createClusterInterpreterProcess_result that) {
- if (that == null)
- return false;
- if (this == that)
- return true;
-
- boolean this_present_success = true;
- boolean that_present_success = true;
- if (this_present_success || that_present_success) {
- if (!(this_present_success && that_present_success))
- return false;
- if (this.success != that.success)
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int hashCode = 1;
-
- hashCode = hashCode * 8191 + ((success) ? 131071 : 524287);
-
- return hashCode;
- }
-
- @Override
- public int compareTo(createClusterInterpreterProcess_result other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
-
- lastComparison = java.lang.Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetSuccess()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- @org.apache.thrift.annotation.Nullable
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- scheme(iprot).read(iprot, this);
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- scheme(oprot).write(oprot, this);
- }
-
- @Override
- public java.lang.String toString() {
- java.lang.StringBuilder sb = new java.lang.StringBuilder("createClusterInterpreterProcess_result(");
- boolean first = true;
-
- sb.append("success:");
- sb.append(this.success);
- first = false;
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift.TException {
- // check for required fields
- // check for sub-struct validity
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
- try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bitfield = 0;
- read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private static class createClusterInterpreterProcess_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
- public createClusterInterpreterProcess_resultStandardScheme getScheme() {
- return new createClusterInterpreterProcess_resultStandardScheme();
- }
- }
-
- private static class createClusterInterpreterProcess_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<createClusterInterpreterProcess_result> {
-
- public void read(org.apache.thrift.protocol.TProtocol iprot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField schemeField;
- iprot.readStructBegin();
- while (true)
- {
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 0: // SUCCESS
- if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
- struct.success = iprot.readBool();
- struct.setSuccessIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- struct.validate();
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
- struct.validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- if (struct.isSetSuccess()) {
- oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
- oprot.writeBool(struct.success);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- }
-
- private static class createClusterInterpreterProcess_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
- public createClusterInterpreterProcess_resultTupleScheme getScheme() {
- return new createClusterInterpreterProcess_resultTupleScheme();
- }
- }
-
- private static class createClusterInterpreterProcess_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<createClusterInterpreterProcess_result> {
-
- @Override
- public void write(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet optionals = new java.util.BitSet();
- if (struct.isSetSuccess()) {
- optionals.set(0);
- }
- oprot.writeBitSet(optionals, 1);
- if (struct.isSetSuccess()) {
- oprot.writeBool(struct.success);
- }
- }
-
- @Override
- public void read(org.apache.thrift.protocol.TProtocol prot, createClusterInterpreterProcess_result struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(1);
- if (incoming.get(0)) {
- struct.success = iprot.readBool();
- struct.setSuccessIsSet(true);
- }
- }
- }
-
- private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
- return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
- }
- }
-
-}
diff --git a/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift b/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift
deleted file mode 100644
index c6f208e..0000000
--- a/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace java org.apache.zeppelin.interpreter.thrift
-
-struct ClusterIntpProcParameters {
- 1: string host,
- 2: i32 port,
- 3: string userName,
- 4: string noteId,
- 5: string replName,
- 6: string defaultInterpreterSetting
-}
-
-service ClusterManagerService {
- bool createClusterInterpreterProcess(1: ClusterIntpProcParameters intpProcParameters);
-}
diff --git a/zeppelin-interpreter/src/main/thrift/genthrift.sh b/zeppelin-interpreter/src/main/thrift/genthrift.sh
index 31efeae..23a295a 100755
--- a/zeppelin-interpreter/src/main/thrift/genthrift.sh
+++ b/zeppelin-interpreter/src/main/thrift/genthrift.sh
@@ -21,7 +21,6 @@ rm -rf gen-java
rm -rf ../java/org/apache/zeppelin/interpreter/thrift
thrift --gen java RemoteInterpreterService.thrift
thrift --gen java RemoteInterpreterEventService.thrift
-thrift --gen java ClusterManagerService.thrift
for file in gen-java/org/apache/zeppelin/interpreter/thrift/* ; do
cat java_license_header.txt ${file} > ${file}.tmp
mv -f ${file}.tmp ${file}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
new file mode 100644
index 0000000..e2ce781
--- /dev/null
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ClusterMultiNodeTest {
+ private static Logger LOGGER = LoggerFactory.getLogger(ClusterMultiNodeTest.class);
+
+ private static List<ClusterManagerServer> clusterServers = new ArrayList<>();
+ private static ClusterManagerClient clusterClient = null;
+
+ static final String metaKey = "ClusterMultiNodeTestKey";
+
+ @BeforeClass
+ public static void startCluster() throws IOException, InterruptedException {
+ LOGGER.info("startCluster >>>");
+
+ String clusterAddrList = "";
+ String zServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
+ for (int i = 0; i < 3; i ++) {
+ // Set the cluster IP and port
+ int zServerPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ clusterAddrList += zServerHost + ":" + zServerPort;
+ if (i != 2) {
+ clusterAddrList += ",";
+ }
+ }
+ ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
+ zconf.setClusterAddress(clusterAddrList);
+
+ // mock cluster manager server
+ String cluster[] = clusterAddrList.split(",");
+ try {
+ for (int i = 0; i < 3; i ++) {
+ String[] parts = cluster[i].split(":");
+ String clusterHost = parts[0];
+ int clusterPort = Integer.valueOf(parts[1]);
+
+ Class clazz = ClusterManagerServer.class;
+ Constructor constructor = clazz.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+ clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
+
+ clusterServers.add(clusterServer);
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ for (ClusterManagerServer clusterServer : clusterServers) {
+ clusterServer.start();
+ }
+
+ // mock cluster manager client
+ clusterClient = ClusterManagerClient.getInstance();
+ clusterClient.start(metaKey);
+
+ // Waiting for cluster startup
+ int wait = 0;
+ while(wait++ < 100) {
+ if (clusterIsStartup() && clusterClient.raftInitialized()) {
+ LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
+ break;
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ Thread.sleep(3000);
+ assertEquals(true, clusterIsStartup());
+ LOGGER.info("startCluster <<<");
+
+ getClusterServerMeta();
+ }
+
+ @AfterClass
+ public static void stopCluster() {
+ LOGGER.info("stopCluster >>>");
+ if (null != clusterClient) {
+ clusterClient.shutdown();
+ }
+ for (ClusterManagerServer clusterServer : clusterServers) {
+ clusterServer.shutdown();
+ }
+ LOGGER.info("stopCluster <<<");
+ }
+
+ static boolean clusterIsStartup() {
+ boolean foundLeader = false;
+ for (ClusterManagerServer clusterServer : clusterServers) {
+ if (!clusterServer.raftInitialized()) {
+ LOGGER.warn("clusterServer not Initialized!");
+ return false;
+ }
+ if (clusterServer.isClusterLeader()) {
+ foundLeader = true;
+ }
+ }
+
+ if (!foundLeader) {
+ LOGGER.warn("Can not found leader!");
+ return false;
+ }
+
+ return true;
+ }
+
+ public static void getClusterServerMeta() {
+ LOGGER.info("getClusterServerMeta >>>");
+ // Get metadata for all services
+ Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+ LOGGER.info(srvMeta.toString());
+
+ Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+ LOGGER.info(intpMeta.toString());
+
+ assertNotNull(srvMeta);
+ assertEquals(true, (srvMeta instanceof HashMap));
+ HashMap hashMap = (HashMap) srvMeta;
+
+ assertEquals(hashMap.size(), 3);
+ LOGGER.info("getClusterServerMeta <<<");
+ }
+}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
similarity index 69%
copy from zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
copy to zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
index ef4d7fd..59853d4 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
@@ -1,4 +1,3 @@
-package org.apache.zeppelin.cluster;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,11 +14,13 @@ package org.apache.zeppelin.cluster;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.zeppelin.cluster;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -31,21 +32,20 @@ import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-public class ClusterManagerTest {
- private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerTest.class);
-
- private static ClusterManagerServer clusterManagerServer = null;
- private static ClusterManagerClient clusterManagerClient = null;
+public class ClusterSingleNodeTest {
+ private static Logger LOGGER = LoggerFactory.getLogger(ClusterSingleNodeTest.class);
+ private static ZeppelinConfiguration zconf;
- private static ZeppelinConfiguration zconf = null;
+ private static ClusterManagerServer clusterServer = null;
+ private static ClusterManagerClient clusterClient = null;
static String zServerHost;
static int zServerPort;
- static final String metaKey = "ClusterManagerTestKey";
+ static final String metaKey = "ClusterSingleNodeTestKey";
@BeforeClass
- public static void initClusterEnv() throws IOException, InterruptedException {
- LOGGER.info("initClusterEnv >>>");
+ public static void startCluster() throws IOException, InterruptedException {
+ LOGGER.info("startCluster >>>");
zconf = ZeppelinConfiguration.create();
@@ -55,19 +55,19 @@ public class ClusterManagerTest {
zconf.setClusterAddress(zServerHost + ":" + zServerPort);
// mock cluster manager server
- clusterManagerServer = ClusterManagerServer.getInstance();
- clusterManagerServer.start(null);
+ clusterServer = ClusterManagerServer.getInstance();
+ clusterServer.start();
// mock cluster manager client
- clusterManagerClient = ClusterManagerClient.getInstance();
- clusterManagerClient.start(metaKey);
+ clusterClient = ClusterManagerClient.getInstance();
+ clusterClient.start(metaKey);
// Waiting for cluster startup
int wait = 0;
while(wait++ < 100) {
- if (clusterManagerServer.isClusterLeader()
- && clusterManagerServer.raftInitialized()
- && clusterManagerClient.raftInitialized()) {
+ if (clusterServer.isClusterLeader()
+ && clusterServer.raftInitialized()
+ && clusterClient.raftInitialized()) {
LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
break;
}
@@ -77,19 +77,33 @@ public class ClusterManagerTest {
e.printStackTrace();
}
}
- assertEquals(true, clusterManagerServer.isClusterLeader());
- LOGGER.info("initClusterEnv <<<");
+ Thread.sleep(3000);
+ assertEquals(true, clusterServer.isClusterLeader());
+ LOGGER.info("startCluster <<<");
+ }
+
+ @AfterClass
+ public static void stopCluster() {
+ if (null != clusterClient) {
+ clusterClient.shutdown();
+ }
+ if (null != clusterClient) {
+ clusterServer.shutdown();
+ }
+ LOGGER.info("stopCluster");
}
@Test
public void getServerMeta() {
- LOGGER.info("serverMeta >>>");
+ LOGGER.info("getServerMeta >>>");
// Get metadata for all services
- Object meta = clusterManagerClient.getClusterMeta(ClusterMetaType.ServerMeta, "");
-
+ Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
LOGGER.info(meta.toString());
+ Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+ LOGGER.info(intpMeta.toString());
+
assertNotNull(meta);
assertEquals(true, (meta instanceof HashMap));
HashMap hashMap = (HashMap) meta;
@@ -101,7 +115,7 @@ public class ClusterManagerTest {
assertEquals(true, mapMetaValues.size()>0);
- LOGGER.info("serverMeta <<<");
+ LOGGER.info("getServerMeta <<<");
}
@Test
@@ -110,8 +124,6 @@ public class ClusterManagerTest {
HashMap<String, Object> meta = new HashMap<>();
meta.put(ClusterMeta.SERVER_HOST, zServerHost);
meta.put(ClusterMeta.SERVER_PORT, zServerPort);
- meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST");
- meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT");
meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT");
meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
@@ -120,11 +132,11 @@ public class ClusterManagerTest {
meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
// put IntpProcess Meta
- clusterManagerClient.putClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey, meta);
+ clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
// get IntpProcess Meta
HashMap<String, HashMap<String, Object>> check
- = clusterManagerClient.getClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey);
+ = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
LOGGER.info(check.toString());
diff --git a/zeppelin-plugins/launcher/cluster/pom.xml b/zeppelin-plugins/launcher/cluster/pom.xml
new file mode 100644
index 0000000..80ca0b3
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zengine-plugins-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../../../zeppelin-plugins</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>launcher-cluster</artifactId>
+ <packaging>jar</packaging>
+ <version>0.9.0-SNAPSHOT</version>
+ <name>Zeppelin: Plugin Cluster Launcher</name>
+ <description>Launcher implementation to run interpreters on cluster</description>
+
+ <properties>
+ <plugin.name>Launcher/ClusterInterpreterLauncher</plugin.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>launcher-standard</artifactId>
+ <version>0.9.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>${project.basedir}/src/test/resources</directory>
+ </testResource>
+ <testResource>
+ <directory>${project.basedir}/src/main/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <includes>
+ <include>**/*.*</include>
+ </includes>
+ </resource>
+ </resources>
+ </build>
+</project>
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
new file mode 100644
index 0000000..7d8ff1e
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.cluster.event.ClusterEvent;
+import org.apache.zeppelin.cluster.event.ClusterEventListener;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
+/**
+ * Interpreter Launcher which use cluster to launch the interpreter process.
+ */
+public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
+ implements ClusterEventListener {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncher.class);
+
+ public static final int CHECK_META_INTERVAL = 500; // ms
+ private InterpreterLaunchContext context;
+ private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
+
+ public ClusterInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage)
+ throws IOException {
+ super(zConf, recoveryStorage);
+ clusterServer.addClusterEventListeners(this);
+ }
+
+ @Override
+ public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
+ LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+
+ this.context = context;
+ this.properties = context.getProperties();
+ int connectTimeout = getConnectTimeout();
+ String intpGroupId = context.getInterpreterGroupId();
+
+ HashMap<String, Object> intpProcMeta = clusterServer
+ .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+ if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST)
+ && intpProcMeta.containsKey(INTP_TSERVER_PORT)) {
+ // connect exist Interpreter Process
+ String intpTserverHost = (String) intpProcMeta.get(INTP_TSERVER_HOST);
+ int intpTserverPort = (int) intpProcMeta.get(INTP_TSERVER_PORT);
+ return new RemoteInterpreterRunningProcess(
+ context.getInterpreterSettingName(),
+ connectTimeout,
+ intpTserverHost,
+ intpTserverPort);
+ } else {
+ // No process was found for the InterpreterGroup ID
+ HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
+ if (null == meta) {
+ LOGGER.error("Don't get idle node meta, launch interpreter on local.");
+ super.launch(context);
+ }
+
+ String srvHost = (String) meta.get(SERVER_HOST);
+ String localhost = RemoteInterpreterUtils.findAvailableHostAddress();
+
+ if (localhost.equalsIgnoreCase(srvHost) && false) {
+ // launch interpreter on local
+ return super.launch(context);
+ } else {
+ int srvPort = (int) meta.get(SERVER_PORT);
+
+ Gson gson = new Gson();
+ String sContext = gson.toJson(context);
+
+ Map<String, Object> mapEvent = new HashMap<>();
+ mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS);
+ mapEvent.put(CLUSTER_EVENT_MSG, sContext);
+ String strEvent = gson.toJson(mapEvent);
+ clusterServer.unicastClusterEvent(srvHost, srvPort, strEvent);
+
+ HashMap<String, Object> intpMeta = clusterServer
+ .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+ int retryGetMeta = connectTimeout / CHECK_META_INTERVAL;
+ while ((retryGetMeta-- > 0) &
+ (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+ || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
+ try {
+ Thread.sleep(CHECK_META_INTERVAL);
+ intpMeta = clusterServer
+ .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+ LOGGER.warn("retry {} times to get {} meta!", retryGetMeta, intpGroupId);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ // Check if the remote creation process is successful
+ if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+ || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
+ LOGGER.error("Creating process {} failed on remote server {}:{}",
+ intpGroupId, srvHost, srvPort);
+
+ // launch interpreter on local
+ return super.launch(context);
+ } else {
+ // connnect remote interpreter process
+ String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+ int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+ return new RemoteInterpreterRunningProcess(
+ context.getInterpreterSettingName(),
+ connectTimeout,
+ intpTSrvHost,
+ intpTSrvPort);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onClusterEvent(String event) {
+ Gson gson = new Gson();
+ Map<String, Object> mapEvent = gson.fromJson(event,
+ new TypeToken<Map<String, Object>>(){}.getType());
+ String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
+ ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
+
+ switch (clusterEvent) {
+ case CREATE_INTP_PROCESS:
+ onCreateIntpProcess(mapEvent);
+ break;
+ default:
+ LOGGER.error("Unknown Cluster Event : {}", clusterEvent);
+ break;
+ }
+ }
+
+ private void onCreateIntpProcess(Map<String, Object> mapEvent) {
+ String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
+ try {
+ Gson gson = new Gson();
+ InterpreterLaunchContext context = gson.fromJson(
+ eventMsg, new TypeToken<InterpreterLaunchContext>() {}.getType());
+
+ this.properties = context.getProperties();
+ InterpreterOption option = context.getOption();
+ InterpreterRunner runner = context.getRunner();
+ String intpSetGroupName = context.getInterpreterSettingGroup();
+ String intpSetName = context.getInterpreterSettingName();
+ int connectTimeout = getConnectTimeout();
+ String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ + context.getInterpreterSettingId();
+
+ ClusterInterpreterProcess clusterInterpreterProcess
+ = new ClusterInterpreterProcess(
+ runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
+ context.getZeppelinServerRPCPort(),
+ context.getZeppelinServerHost(),
+ zConf.getInterpreterPortRange(),
+ zConf.getInterpreterDir() + "/" + intpSetGroupName,
+ localRepoPath,
+ buildEnvFromProperties(context),
+ connectTimeout,
+ intpSetName,
+ context.getInterpreterGroupId(),
+ option.isUserImpersonate());
+
+ clusterInterpreterProcess.start(context.getUserName());
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
new file mode 100644
index 0000000..986c2ed
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -0,0 +1,141 @@
+package org.apache.zeppelin.interpreter.launcher;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
+
+public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess {
+ private static final Logger LOGGER
+ = LoggerFactory.getLogger(ClusterInterpreterProcess.class);
+
+ private String intpHost = "";
+ private int intpPort = 0;
+
+ private ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
+
+ public ClusterInterpreterProcess(
+ String intpRunner,
+ int zeppelinServerRPCPort,
+ String zeppelinServerRPCHost,
+ String interpreterPortRange,
+ String intpDir,
+ String localRepoDir,
+ Map<String, String> env,
+ int connectTimeout,
+ String interpreterSettingName,
+ String interpreterGroupId,
+ boolean isUserImpersonated) {
+
+ super(intpRunner,
+ zeppelinServerRPCPort,
+ zeppelinServerRPCHost,
+ interpreterPortRange,
+ intpDir,
+ localRepoDir,
+ env,
+ connectTimeout,
+ interpreterSettingName,
+ interpreterGroupId,
+ isUserImpersonated);
+ }
+
+ @Override
+ public void start(String userName) throws IOException {
+ CheckIntpRunStatusThread checkIntpRunStatusThread = new CheckIntpRunStatusThread(this);
+ checkIntpRunStatusThread.start();
+
+ super.start(userName);
+ }
+
+ @Override
+ public void processStarted(int port, String host) {
+ // Cluster mode, discovering interpreter processes through metadata registration
+ this.intpHost = host;
+ this.intpPort = port;
+ super.processStarted(port, host);
+ }
+
+ @Override
+ public String getHost() {
+ return intpHost;
+ }
+
+ @Override
+ public int getPort() {
+ return intpPort;
+ }
+
+ @Override
+ public boolean isRunning() {
+ if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return null;
+ }
+
+ // Metadata registered in the cluster by the interpreter process,
+ // Keep the interpreter process started
+ private class CheckIntpRunStatusThread extends Thread {
+ private ClusterInterpreterProcess intpProcess;
+
+ CheckIntpRunStatusThread(ClusterInterpreterProcess intpProcess) {
+ this.intpProcess = intpProcess;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info("checkIntpRunStatusThread run() >>>");
+
+ String intpGroupId = getInterpreterGroupId();
+
+ HashMap<String, Object> intpMeta = clusterServer
+ .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+ int connectTimeout = intpProcess.getConnectTimeout();
+ int retryGetMeta = connectTimeout / ClusterInterpreterLauncher.CHECK_META_INTERVAL;
+ while ((retryGetMeta-- > 0)
+ && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+ || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
+ try {
+ Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
+ intpMeta = clusterServer
+ .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+ LOGGER.info("retry {} times to get {} meta!", retryGetMeta, intpGroupId);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
+ && intpMeta.containsKey(INTP_TSERVER_PORT)) {
+ String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+ int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+ LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
+
+ intpProcess.processStarted(intpPort, intpHost);
+ }
+ }
+
+ if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+ || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
+ LOGGER.error("Can not found interpreter meta!");
+ }
+
+ LOGGER.info("checkIntpRunStatusThread run() <<<");
+ }
+ }
+}
diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
new file mode 100644
index 0000000..da4bf5e
--- /dev/null
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterInterpreterLauncherTest extends ClusterMockTest {
+
+ @BeforeClass
+ public static void startTest() throws IOException, InterruptedException {
+ ClusterMockTest.startCluster();
+ }
+
+ @AfterClass
+ public static void stopTest() throws IOException, InterruptedException {
+ ClusterMockTest.stopCluster();
+ }
+
+ @Before
+ public void setUp() {
+ for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
+ System.clearProperty(confVar.getVarName());
+ }
+ }
+
+ @Test
+ public void testConnectExistIntpProcess() throws IOException {
+ mockIntpProcessMeta("intpGroupId");
+
+ ClusterInterpreterLauncher launcher
+ = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
+ Properties properties = new Properties();
+ properties.setProperty(
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
+ InterpreterOption option = new InterpreterOption();
+ option.setUserImpersonate(true);
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null,
+ "user1", "intpGroupId", "groupId",
+ "groupName", "name", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+
+ assertTrue(client instanceof RemoteInterpreterRunningProcess);
+ RemoteInterpreterRunningProcess interpreterProcess = (RemoteInterpreterRunningProcess) client;
+ assertEquals("INTP_TSERVER_HOST", interpreterProcess.getHost());
+ assertEquals(0, interpreterProcess.getPort());
+ assertEquals("name", interpreterProcess.getInterpreterSettingName());
+ assertEquals(5000, interpreterProcess.getConnectTimeout());
+ }
+
+ @Test
+ public void testCreateIntpProcess() throws IOException {
+ ClusterInterpreterLauncher launcher
+ = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
+ Properties properties = new Properties();
+ properties.setProperty(
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000");
+ InterpreterOption option = new InterpreterOption();
+ option.setUserImpersonate(true);
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null,
+ "user1", "intpGroupId", "groupId",
+ "groupName", "name", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+
+ assertTrue(client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("name", interpreterProcess.getInterpreterSettingName());
+ assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
+ assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
+ assertEquals(5000, interpreterProcess.getConnectTimeout());
+ assertEquals(zconf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertTrue(interpreterProcess.getEnv().size() >= 1);
+ assertEquals(true, interpreterProcess.isUserImpersonated());
+ }
+}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
similarity index 61%
rename from zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
rename to zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index ef4d7fd..72b51b4 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -1,4 +1,3 @@
-package org.apache.zeppelin.cluster;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,13 +14,14 @@ package org.apache.zeppelin.cluster;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.zeppelin.interpreter.launcher;
+import org.apache.zeppelin.cluster.ClusterManagerClient;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.junit.BeforeClass;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,21 +31,20 @@ import java.util.HashMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-public class ClusterManagerTest {
- private static Logger LOGGER = LoggerFactory.getLogger(ClusterManagerTest.class);
+public class ClusterMockTest {
+ private static Logger LOGGER = LoggerFactory.getLogger(ClusterMockTest.class);
- private static ClusterManagerServer clusterManagerServer = null;
- private static ClusterManagerClient clusterManagerClient = null;
+ private static ClusterManagerServer clusterServer = null;
+ private static ClusterManagerClient clusterClient = null;
- private static ZeppelinConfiguration zconf = null;
+ protected static ZeppelinConfiguration zconf = null;
static String zServerHost;
static int zServerPort;
- static final String metaKey = "ClusterManagerTestKey";
+ static final String metaKey = "ClusterMockKey";
- @BeforeClass
- public static void initClusterEnv() throws IOException, InterruptedException {
- LOGGER.info("initClusterEnv >>>");
+ public static void startCluster() throws IOException, InterruptedException {
+ LOGGER.info("startCluster >>>");
zconf = ZeppelinConfiguration.create();
@@ -55,20 +54,20 @@ public class ClusterManagerTest {
zconf.setClusterAddress(zServerHost + ":" + zServerPort);
// mock cluster manager server
- clusterManagerServer = ClusterManagerServer.getInstance();
- clusterManagerServer.start(null);
+ clusterServer = ClusterManagerServer.getInstance();
+ clusterServer.start();
// mock cluster manager client
- clusterManagerClient = ClusterManagerClient.getInstance();
- clusterManagerClient.start(metaKey);
+ clusterClient = ClusterManagerClient.getInstance();
+ clusterClient.start(metaKey);
// Waiting for cluster startup
int wait = 0;
- while(wait++ < 100) {
- if (clusterManagerServer.isClusterLeader()
- && clusterManagerServer.raftInitialized()
- && clusterManagerClient.raftInitialized()) {
- LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
+ while (wait++ < 100) {
+ if (clusterServer.isClusterLeader()
+ && clusterServer.raftInitialized()
+ && clusterClient.raftInitialized()) {
+ LOGGER.info("wait {}(ms) found cluster leader", wait * 3000);
break;
}
try {
@@ -77,16 +76,27 @@ public class ClusterManagerTest {
e.printStackTrace();
}
}
- assertEquals(true, clusterManagerServer.isClusterLeader());
- LOGGER.info("initClusterEnv <<<");
+ assertEquals(true, clusterServer.isClusterLeader());
+
+ LOGGER.info("startCluster <<<");
+ }
+
+ public static void stopCluster() {
+ LOGGER.info("stopCluster >>>");
+ if (null != clusterClient) {
+ clusterClient.shutdown();
+ }
+ if (null != clusterClient) {
+ clusterServer.shutdown();
+ }
+ LOGGER.info("stopCluster <<<");
}
- @Test
public void getServerMeta() {
LOGGER.info("serverMeta >>>");
// Get metadata for all services
- Object meta = clusterManagerClient.getClusterMeta(ClusterMetaType.ServerMeta, "");
+ Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
LOGGER.info(meta.toString());
@@ -99,37 +109,34 @@ public class ClusterManagerTest {
assertEquals(true, (values instanceof HashMap));
HashMap mapMetaValues = (HashMap) values;
- assertEquals(true, mapMetaValues.size()>0);
+ assertEquals(true, mapMetaValues.size() > 0);
LOGGER.info("serverMeta <<<");
}
- @Test
- public void putIntpProcessMeta() {
+ public void mockIntpProcessMeta(String metaKey) {
// mock IntpProcess Meta
HashMap<String, Object> meta = new HashMap<>();
- meta.put(ClusterMeta.SERVER_HOST, zServerHost);
- meta.put(ClusterMeta.SERVER_PORT, zServerPort);
- meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST");
- meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT");
+ meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST");
+ meta.put(ClusterMeta.SERVER_PORT, 0);
meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
- meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT");
+ meta.put(ClusterMeta.INTP_TSERVER_PORT, 0);
meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
meta.put(ClusterMeta.CPU_USED, "CPU_USED");
meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
// put IntpProcess Meta
- clusterManagerClient.putClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey, meta);
+ clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
// get IntpProcess Meta
HashMap<String, HashMap<String, Object>> check
- = clusterManagerClient.getClusterMeta(ClusterMetaType.IntpProcessMeta, metaKey);
+ = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
LOGGER.info(check.toString());
assertNotNull(check);
assertNotNull(check.get(metaKey));
- assertEquals(true, check.get(metaKey).size()>0);
+ assertEquals(true, check.get(metaKey).size() == 8);
}
}
diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml
index 6c7aea9..727acd7 100644
--- a/zeppelin-plugins/pom.xml
+++ b/zeppelin-plugins/pom.xml
@@ -47,6 +47,7 @@
<module>launcher/standard</module>
<module>launcher/k8s-standard</module>
<module>launcher/spark</module>
+ <module>launcher/cluster</module>
</modules>
<dependencies>
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index cf9074f..1e180d8 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -31,7 +31,6 @@ import javax.servlet.ServletContextListener;
import org.apache.commons.lang.StringUtils;
import org.apache.shiro.web.env.EnvironmentLoaderListener;
import org.apache.shiro.web.servlet.ShiroFilter;
-import org.apache.zeppelin.cluster.ClusterManager;
import org.apache.zeppelin.cluster.ClusterManagerServer;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
@@ -43,8 +42,8 @@ import org.apache.zeppelin.helium.HeliumBundleFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.thrift.ClusterManagerService;
import org.apache.zeppelin.notebook.NoteEventListener;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.AuthorizationService;
@@ -53,6 +52,7 @@ import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.notebook.scheduler.NoSchedulerService;
import org.apache.zeppelin.notebook.scheduler.QuartzSchedulerService;
import org.apache.zeppelin.notebook.scheduler.SchedulerService;
+import org.apache.zeppelin.plugin.PluginManager;
import org.apache.zeppelin.rest.exception.WebApplicationExceptionMapper;
import org.apache.zeppelin.search.LuceneSearch;
import org.apache.zeppelin.search.SearchService;
@@ -96,10 +96,10 @@ public class ZeppelinServer extends ResourceConfig {
public static Server jettyWebServer;
public static ServiceLocator sharedServiceLocator;
+ private static ZeppelinConfiguration conf = ZeppelinConfiguration.create();
+
@Inject
public ZeppelinServer() {
- ZeppelinConfiguration conf = ZeppelinConfiguration.create();
-
InterpreterOutput.limit = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT);
packages("org.apache.zeppelin.rest");
@@ -162,10 +162,6 @@ public class ZeppelinServer extends ResourceConfig {
.to(NoteEventListener.class)
.to(WebSocketServlet.class)
.in(Singleton.class);
- bindAsContract(ClusterManagerServer.class)
- .to(ClusterManager.class)
- .to(ClusterManagerService.Iface.class)
- .in(Singleton.class);
if (conf.isZeppelinNotebookCronEnable()) {
bind(QuartzSchedulerService.class).to(SchedulerService.class).in(Singleton.class);
} else {
@@ -356,9 +352,9 @@ public class ZeppelinServer extends ResourceConfig {
}
private static void setupClusterManagerServer(ServiceLocator serviceLocator) {
- InterpreterFactory interpreterFactory
- = sharedServiceLocator.getService(InterpreterFactory.class);
- sharedServiceLocator.getService(ClusterManagerServer.class).start(interpreterFactory);
+ if (conf.isClusterMode()) {
+ ClusterManagerServer.getInstance().start();
+ }
}
private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) {
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 8eebce4..2fbef23 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -670,6 +670,8 @@ public class InterpreterSetting {
public String getLauncherPlugin() {
if (isRunningOnKubernetes()) {
return "K8sStandardInterpreterLauncher";
+ } else if (isRunningOnCluster()) {
+ return "ClusterInterpreterLauncher";
} else {
if (group.equals("spark")) {
return "SparkInterpreterLauncher";
@@ -683,6 +685,10 @@ public class InterpreterSetting {
return conf.getRunMode() == ZeppelinConfiguration.RUN_MODE.K8S;
}
+ private boolean isRunningOnCluster() {
+ return conf.isClusterMode();
+ }
+
public boolean isUserAuthorized(List<String> userAndRoles) {
if (!option.permissionIsSet()) {
return true;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 02e8fd0..6d91f1d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -173,6 +173,10 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess {
return interpreterSettingName;
}
+ public String getInterpreterGroupId() {
+ return interpreterGroupId;
+ }
+
@VisibleForTesting
public String getInterpreterRunner() {
return interpreterRunner;