You are viewing a plain text version of this content. The canonical link for it is here.
Posted to submarine-dev@hadoop.apache.org by li...@apache.org on 2019/10/23 02:41:51 UTC
[hadoop-submarine] branch master updated: SUBMARINE-256.
Interpreter support cluster mode
This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 0ed88ff SUBMARINE-256. Interpreter support cluster mode
0ed88ff is described below
commit 0ed88ff6ba9940ffc865e4b7e9b9146cf0e1efda
Author: Xun Liu <li...@apache.org>
AuthorDate: Tue Oct 22 17:33:26 2019 +0800
SUBMARINE-256. Interpreter support cluster mode
### What is this PR for?
After the interpreter integrates the cluster module,
When the interpreter is started, the `interpreter ID`, `IP` and `port` of the interpreter are registered in the metadata of the cluster, so that the interpreter can be found in the entire cluster and used by the notebook and workbench server.
### What type of PR is it?
[Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/SUBMARINE-256
### How should this be tested?
* [CI Pass](https://travis-ci.org/liuxunorg/hadoop-submarine/builds/601138608)
* InterpreterClusterTest.java
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Xun Liu <li...@apache.org>
Closes #60 from liuxunorg/SUBMARINE-256 and squashes the following commits:
dc1ba6e [Xun Liu] SUBMARINE-256. Interpreter support cluster mode
---
pom.xml | 3 +-
submarine-commons/commons-cluster/pom.xml | 2 +-
...lusterManagerClient.java => ClusterClient.java} | 24 ++--
.../submarine/commons/cluster/ClusterManager.java | 21 +--
.../submarine/commons/cluster/ClusterMonitor.java | 4 +-
...lusterManagerServer.java => ClusterServer.java} | 45 +++---
.../commons/cluster/ClusterMultiNodeTest.java | 18 +--
submarine-commons/pom.xml | 1 +
.../interpreter/interpreter-engine/pom.xml | 14 +-
.../apache/submarine/interpreter/Interpreter.java | 29 ++++
.../submarine/interpreter/InterpreterProcess.java | 158 ++++++++++++++++----
.../src/main/resources/log4j.properties | 0
.../interpreter/python-interpreter/pom.xml | 22 +++
.../submarine/interpreter/PythonInterpreter.java | 7 +-
.../src/main/resources/log4j.properties | 2 +-
.../interpreter/InterpreterClusterTest.java | 159 +++++++++++++++++++++
.../apache/submarine/server/WorkbenchServer.java | 12 +-
.../server/WorkbenchClusterServerTest.java | 8 +-
18 files changed, 436 insertions(+), 93 deletions(-)
diff --git a/pom.xml b/pom.xml
index 0b95abb..fbc6852 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
<properties>
<submarine.version>0.3.0-SNAPSHOT</submarine.version>
+
<!-- language versions -->
<java.version>1.8</java.version>
@@ -92,7 +93,7 @@
<commons-lang3.version>3.4</commons-lang3.version>
<commons.io.version>2.5</commons.io.version>
<junit.version>4.12</junit.version>
- <jsr305.version>3.0.0</jsr305.version>
+ <jsr305.version>1.3.9</jsr305.version>
<mockito.version>2.23.4</mockito.version>
<powermock.version>1.6.4</powermock.version>
<guava.version>22.0</guava.version>
diff --git a/submarine-commons/commons-cluster/pom.xml b/submarine-commons/commons-cluster/pom.xml
index cbfbc29..8d4f9c7 100644
--- a/submarine-commons/commons-cluster/pom.xml
+++ b/submarine-commons/commons-cluster/pom.xml
@@ -34,7 +34,7 @@
<dependency>
<groupId>org.apache.submarine</groupId>
<artifactId>commons-utils</artifactId>
- <version>0.3.0-SNAPSHOT</version>
+ <version>${submarine.version}</version>
</dependency>
<dependency>
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterClient.java
similarity index 78%
rename from submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
rename to submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterClient.java
index 930930e..cdffbf3 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerClient.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterClient.java
@@ -25,23 +25,23 @@ import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PRO
/**
* Cluster management client class instantiated in submarine-interperter
*/
-public class ClusterManagerClient extends ClusterManager {
- private static Logger LOG = LoggerFactory.getLogger(ClusterManagerClient.class);
+public class ClusterClient extends ClusterManager {
+ private static Logger LOG = LoggerFactory.getLogger(ClusterClient.class);
- private static ClusterManagerClient instance = null;
+ private static ClusterClient instance = null;
// Do not use the getInstance function in the test case,
// which will result in an inability to update the instance according to the configuration.
- public static ClusterManagerClient getInstance() {
- synchronized (ClusterManagerClient.class) {
+ public static ClusterClient getInstance() {
+ synchronized (ClusterClient.class) {
if (instance == null) {
- instance = new ClusterManagerClient();
+ instance = new ClusterClient();
}
- return instance;
}
+ return instance;
}
- private ClusterManagerClient() {
+ private ClusterClient() {
super();
}
@@ -60,9 +60,9 @@ public class ClusterManagerClient extends ClusterManager {
return false;
}
- // In the ClusterManagerClient metaKey equal interperterGroupId
+ // In the ClusterClient metaKey equal interperterGroupId
public void start(String metaKey) {
- LOG.info("ClusterManagerClient::start({})", metaKey);
+ LOG.info("ClusterClient::start({})", metaKey);
if (!sconf.workbenchIsClusterMode()) {
return;
}
@@ -77,7 +77,9 @@ public class ClusterManagerClient extends ClusterManager {
if (!sconf.workbenchIsClusterMode()) {
return;
}
- clusterMonitor.shutdown();
+ if (null != clusterMonitor) {
+ clusterMonitor.shutdown();
+ }
super.shutdown();
}
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
index 71dec98..816aad8 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
@@ -145,7 +145,7 @@ public abstract class ClusterManager {
= new ConcurrentLinkedQueue<>();
// submarine server host & port
- protected String zeplServerHost = "";
+ protected String serverHost = "";
protected ClusterMonitor clusterMonitor = null;
@@ -153,7 +153,7 @@ public abstract class ClusterManager {
protected ClusterManager() {
try {
- zeplServerHost = NetUtils.findAvailableHostAddress();
+ this.serverHost = NetUtils.findAvailableHostAddress();
String clusterAddr = sconf.getClusterAddress();
LOG.info(this.getClass().toString() + "::clusterAddr = {}", clusterAddr);
if (!StringUtils.isEmpty(clusterAddr)) {
@@ -163,7 +163,7 @@ public abstract class ClusterManager {
String[] parts = cluster[i].split(":");
String clusterHost = parts[0];
int clusterPort = Integer.valueOf(parts[1]);
- if (zeplServerHost.equalsIgnoreCase(clusterHost)) {
+ if (this.serverHost.equalsIgnoreCase(clusterHost)) {
raftServerPort = clusterPort;
}
@@ -219,8 +219,8 @@ public abstract class ClusterManager {
LOG.error(e.getMessage());
}
- MemberId memberId = MemberId.from(zeplServerHost + ":" + raftClientPort);
- Address address = Address.from(zeplServerHost, raftClientPort);
+ MemberId memberId = MemberId.from(serverHost + ":" + raftClientPort);
+ Address address = Address.from(serverHost, raftClientPort);
raftAddressMap.put(memberId, address);
MessagingService messagingManager
@@ -299,21 +299,24 @@ public abstract class ClusterManager {
try {
if (null != raftSessionClient) {
- raftSessionClient.close().get(3, TimeUnit.SECONDS);
+ LOG.info("ClusterManager::shutdown(raftSessionClient)");
+ raftSessionClient.close().get(5, TimeUnit.SECONDS);
}
if (null != raftClient) {
- raftClient.close().get(3, TimeUnit.SECONDS);
+ LOG.info("ClusterManager::shutdown(raftClient)");
+ raftClient.close().get(5, TimeUnit.SECONDS);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
}
+ LOG.info("ClusterManager::shutdown()");
}
public String getClusterNodeName() {
if (isTest) {
// Start three cluster servers in the test case at the same time,
// need to avoid duplicate names
- return this.zeplServerHost + ":" + this.raftServerPort;
+ return this.serverHost + ":" + this.raftServerPort;
}
String hostName = "";
@@ -342,7 +345,7 @@ public abstract class ClusterManager {
}
// add cluster name
- newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+ newMetaValue.put(ClusterMeta.SERVER_HOST, serverHost);
newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort);
raftSessionClient.execute(operation(ClusterStateMachine.PUT,
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
index 4521710..ddc5527 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterMonitor.java
@@ -72,8 +72,8 @@ public class ClusterMonitor {
// and the interperterGroupID when monitoring the interperter processes
private String metaKey;
- public ClusterMonitor(ClusterManager clusterManagerServer) {
- this.clusterManager = clusterManagerServer;
+ public ClusterMonitor(ClusterManager clusterManager) {
+ this.clusterManager = clusterManager;
SubmarineConfiguration sconf = SubmarineConfiguration.create();
heartbeatInterval = sconf.getClusterHeartbeatInterval();
diff --git a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
similarity index 89%
rename from submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
rename to submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
index be3273d..b7ec140 100644
--- a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManagerServer.java
+++ b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
@@ -58,29 +58,29 @@ import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_M
* 1. Create a raft server
* 2. Remotely create interpreter's thrift service
*/
-public class ClusterManagerServer extends ClusterManager {
- private static Logger LOG = LoggerFactory.getLogger(ClusterManagerServer.class);
+public class ClusterServer extends ClusterManager {
+ private static Logger LOG = LoggerFactory.getLogger(ClusterServer.class);
- private static ClusterManagerServer instance = null;
+ private static ClusterServer instance = null;
// raft server
protected RaftServer raftServer = null;
protected MessagingService messagingService = null;
- private ClusterManagerServer() {
+ private ClusterServer() {
super();
}
// Do not use the getInstance function in the test case,
// which will result in an inability to update the instance according to the configuration.
- public static ClusterManagerServer getInstance() {
- synchronized (ClusterManagerServer.class) {
+ public static ClusterServer getInstance() {
+ synchronized (ClusterServer.class) {
if (instance == null) {
- instance = new ClusterManagerServer();
+ instance = new ClusterServer();
}
- return instance;
}
+ return instance;
}
public void start() {
@@ -99,9 +99,9 @@ public class ClusterManagerServer extends ClusterManager {
}
@VisibleForTesting
- public void initTestCluster(String clusterAddrList, String host, int port) {
+ void initTestCluster(String clusterAddrList, String host, int port) {
isTest = true;
- this.zeplServerHost = host;
+ this.serverHost = host;
this.raftServerPort = port;
// clear
@@ -153,8 +153,8 @@ public class ClusterManagerServer extends ClusterManager {
public void run() {
LOG.info("RaftServer run() >>>");
- Address address = Address.from(zeplServerHost, raftServerPort);
- Member member = Member.builder(MemberId.from(zeplServerHost + ":" + raftServerPort))
+ Address address = Address.from(serverHost, raftServerPort);
+ Member member = Member.builder(MemberId.from(serverHost + ":" + raftServerPort))
.withAddress(address)
.build();
messagingService = NettyMessagingService.builder()
@@ -200,7 +200,7 @@ public class ClusterManagerServer extends ClusterManager {
HashMap<String, Object> meta = new HashMap<String, Object>();
String nodeName = getClusterNodeName();
meta.put(ClusterMeta.NODE_NAME, nodeName);
- meta.put(ClusterMeta.SERVER_HOST, zeplServerHost);
+ meta.put(ClusterMeta.SERVER_HOST, serverHost);
meta.put(ClusterMeta.SERVER_PORT, raftServerPort);
meta.put(ClusterMeta.SERVER_START_TIME, LocalDateTime.now());
putClusterMeta(SERVER_META, nodeName, meta);
@@ -215,27 +215,34 @@ public class ClusterManagerServer extends ClusterManager {
if (!sconf.workbenchIsClusterMode()) {
return;
}
+ LOG.info("ClusterServer::shutdown()");
try {
// delete local machine meta
deleteClusterMeta(SERVER_META, getClusterNodeName());
- Thread.sleep(300);
- clusterMonitor.shutdown();
+ Thread.sleep(500);
+ if (null != clusterMonitor) {
+ clusterMonitor.shutdown();
+ }
// wait raft commit metadata
- Thread.sleep(300);
+ Thread.sleep(500);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
}
+ // close raft client
+ super.shutdown();
+
if (null != raftServer && raftServer.isRunning()) {
try {
- raftServer.shutdown().get(3, TimeUnit.SECONDS);
+ LOG.info("ClusterServer::raftServer.shutdown()");
+ raftServer.shutdown().get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
}
}
- super.shutdown();
+ LOG.info("ClusterServer::super.shutdown()");
}
// Obtain the server node whose resources are idle in the cluster
@@ -284,7 +291,7 @@ public class ClusterManagerServer extends ClusterManager {
LOG.debug("send broadcastClusterEvent message {}", msg);
}
for (Node node : clusterNodes) {
- if (StringUtils.equals(node.address().host(), zeplServerHost)
+ if (StringUtils.equals(node.address().host(), serverHost)
&& node.address().port() == raftServerPort) {
// skip myself
continue;
diff --git a/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
index 6a4c39a..b726323 100644
--- a/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
+++ b/submarine-commons/commons-cluster/src/test/java/org/apache/submarine/commons/cluster/ClusterMultiNodeTest.java
@@ -38,8 +38,8 @@ import static org.junit.Assert.assertNotNull;
public class ClusterMultiNodeTest {
private static Logger LOG = LoggerFactory.getLogger(ClusterMultiNodeTest.class);
- private static List<ClusterManagerServer> clusterServers = new ArrayList<>();
- private static ClusterManagerClient clusterClient = null;
+ private static List<ClusterServer> clusterServers = new ArrayList<>();
+ private static ClusterClient clusterClient = null;
static final String metaKey = "ClusterMultiNodeTestKey";
@@ -69,10 +69,10 @@ public class ClusterMultiNodeTest {
String clusterHost = parts[0];
int clusterPort = Integer.valueOf(parts[1]);
- Class clazz = ClusterManagerServer.class;
+ Class clazz = ClusterServer.class;
Constructor constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
- ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+ ClusterServer clusterServer = (ClusterServer) constructor.newInstance();
clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
clusterServers.add(clusterServer);
@@ -81,17 +81,17 @@ public class ClusterMultiNodeTest {
LOG.error(e.getMessage(), e);
}
- for (ClusterManagerServer clusterServer : clusterServers) {
+ for (ClusterServer clusterServer : clusterServers) {
clusterServer.start();
}
// mock cluster manager client
try {
- Class clazz = ClusterManagerClient.class;
+ Class clazz = ClusterClient.class;
Constructor constructor = null;
constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
- clusterClient = (ClusterManagerClient) constructor.newInstance();
+ clusterClient = (ClusterClient) constructor.newInstance();
clusterClient.start(metaKey);
} catch (NoSuchMethodException | InstantiationException
| IllegalAccessException | InvocationTargetException e) {
@@ -127,7 +127,7 @@ public class ClusterMultiNodeTest {
if (null != clusterClient) {
clusterClient.shutdown();
}
- for (ClusterManagerServer clusterServer : clusterServers) {
+ for (ClusterServer clusterServer : clusterServers) {
clusterServer.shutdown();
}
LOG.info("ClusterMultiNodeTest::stopCluster <<<");
@@ -135,7 +135,7 @@ public class ClusterMultiNodeTest {
static boolean clusterIsStartup() {
boolean foundLeader = false;
- for (ClusterManagerServer clusterServer : clusterServers) {
+ for (ClusterServer clusterServer : clusterServers) {
if (!clusterServer.raftInitialized()) {
LOG.warn("clusterServer not Initialized!");
return false;
diff --git a/submarine-commons/pom.xml b/submarine-commons/pom.xml
index e306c3e..b985d4c 100644
--- a/submarine-commons/pom.xml
+++ b/submarine-commons/pom.xml
@@ -24,6 +24,7 @@
<groupId>org.apache.submarine</groupId>
<artifactId>submarine</artifactId>
<version>0.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
diff --git a/submarine-workbench/interpreter/interpreter-engine/pom.xml b/submarine-workbench/interpreter/interpreter-engine/pom.xml
index 29a0344..70344d7 100644
--- a/submarine-workbench/interpreter/interpreter-engine/pom.xml
+++ b/submarine-workbench/interpreter/interpreter-engine/pom.xml
@@ -37,6 +37,18 @@
<dependencies>
<dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>commons-utils</artifactId>
+ <version>${submarine.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.submarine</groupId>
+ <artifactId>commons-cluster</artifactId>
+ <version>${submarine.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${zeppelin.version}</version>
@@ -135,7 +147,7 @@
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<configuration>
- <skip>false</skip>
+ <skip>true</skip>
</configuration>
</plugin>
</plugins>
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
new file mode 100644
index 0000000..9681ef4
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.interpreter;
+
+public interface Interpreter {
+ void shutdown();
+ boolean isRunning();
+ void open();
+ InterpreterResult interpret(String code);
+ void close();
+ void cancel();
+ int getProgress();
+ boolean test();
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
index b60582b..67969fe 100644
--- a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
@@ -19,8 +19,12 @@
package org.apache.submarine.interpreter;
import org.apache.commons.lang.StringUtils;
+import org.apache.submarine.commons.cluster.ClusterClient;
+import org.apache.submarine.commons.cluster.meta.ClusterMeta;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.misc.Signal;
@@ -31,44 +35,47 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.SocketException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.net.UnknownHostException;
+import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
/**
* Entry point for Submarine Interpreter process.
* Accepting thrift connections from Submarine Workbench Server.
*/
-public abstract class InterpreterProcess {
- public abstract void open();
- public abstract InterpreterResult interpret(String code);
- public abstract void close();
- public abstract void cancel();
- public abstract int getProgress();
- public abstract boolean test();
-
+public class InterpreterProcess extends Thread implements Interpreter {
private static final Logger LOG = LoggerFactory.getLogger(InterpreterProcess.class);
- protected static String interpreterId;
+ // cluster manager client
+ private ClusterClient clusterClient = ClusterClient.getInstance();
+
+ private SubmarineConfiguration sconf = SubmarineConfiguration.create();
+
+ protected String interpreterId;
+
+ private InterpreterProcess interpreterProcess;
+
+ private AtomicBoolean isRunning = new AtomicBoolean(false);
public static void main(String[] args) throws InterruptedException, IOException {
- String interpreterName = args[0];
+ String interpreterType = args[0];
String interpreterId = args[1];
- String onlyTest = "";
- if (args.length == 3) {
- onlyTest = args[2];
+ Boolean onlyTest = false;
+ if (args.length == 3 && StringUtils.equals(args[2], "test")) {
+ onlyTest = true;
}
- InterpreterProcess interpreterProcess = loadInterpreterPlugin(interpreterName);
-
- if (StringUtils.equals(onlyTest, "test")) {
- boolean testResult = interpreterProcess.test();
- LOG.info("Interpreter test result: {}", testResult);
- System.exit(0);
- return;
- }
+ InterpreterProcess interpreterProcess = new InterpreterProcess(interpreterType, interpreterId, onlyTest);
+ interpreterProcess.start();
// add signal handler
Signal.handle(new Signal("TERM"), new SignalHandler() {
@@ -79,22 +86,83 @@ public abstract class InterpreterProcess {
}
});
+ interpreterProcess.join();
System.exit(0);
}
+ @Override
+ public void run() {
+ isRunning.set(true);
+ while (isRunning.get()) {
+ try {
+ // TODO(Xun Liu): Next PR will add Thrift Server in here
+ LOG.info("Mock TServer run ...");
+ sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ public boolean isRunning() {
+ return isRunning.get();
+ }
+
+ public InterpreterProcess() { }
+
+ public InterpreterProcess(String interpreterType, String interpreterId, Boolean onlyTest)
+ throws IOException {
+ this.interpreterId = interpreterId;
+ this.interpreterProcess = loadInterpreterPlugin(interpreterType);
+
+ if (true == onlyTest) {
+ boolean testResult = interpreterProcess.test();
+ LOG.info("Interpreter test result: {}", testResult);
+ System.exit(0);
+ return;
+ }
+
+ this.clusterClient.start(interpreterId);
+ putClusterMeta();
+ }
+
+ // Submit interpreter process metadata information to cluster metadata
+ private void putClusterMeta() {
+ if (!sconf.workbenchIsClusterMode()){
+ return;
+ }
+ String nodeName = clusterClient.getClusterNodeName();
+ String host = null;
+ try {
+ host = RemoteInterpreterUtils.findAvailableHostAddress();
+ } catch (UnknownHostException | SocketException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ // commit interpreter meta
+ HashMap<String, Object> meta = new HashMap<>();
+ meta.put(ClusterMeta.NODE_NAME, nodeName);
+ meta.put(ClusterMeta.INTP_PROCESS_NAME, this.interpreterId);
+ meta.put(ClusterMeta.INTP_TSERVER_HOST, host);
+ meta.put(ClusterMeta.INTP_START_TIME, LocalDateTime.now());
+ meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
+ meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+ clusterClient.putClusterMeta(INTP_PROCESS_META, this.interpreterId, meta);
+ }
+
// get super interpreter class name
- private static String getSuperInterpreterClassName(String intpName) {
+ private String getSuperInterpreterClassName(String intpName) {
String superIntpClassName = "";
if (StringUtils.equals(intpName, "python")) {
superIntpClassName = "org.apache.submarine.interpreter.PythonInterpreter";
} else {
- LOG.error("Error interpreter name : {}!", intpName);
+ superIntpClassName = "org.apache.submarine.interpreter.InterpreterProcess";
}
return superIntpClassName;
}
- public static synchronized InterpreterProcess loadInterpreterPlugin(String pluginName)
+ public synchronized InterpreterProcess loadInterpreterPlugin(String pluginName)
throws IOException {
LOG.info("Loading Plug name: {}", pluginName);
@@ -110,7 +178,7 @@ public abstract class InterpreterProcess {
return intpProcess;
}
- private static InterpreterProcess loadPluginFromClassPath(
+ private InterpreterProcess loadPluginFromClassPath(
String pluginClassName, URLClassLoader pluginClassLoader) {
InterpreterProcess intpProcess = null;
try {
@@ -143,7 +211,7 @@ public abstract class InterpreterProcess {
}
// Get the class load from the specified path
- private static URLClassLoader getPluginClassLoader(String pluginsDir, String pluginName)
+ private URLClassLoader getPluginClassLoader(String pluginsDir, String pluginName)
throws IOException {
File pluginFolder = new File(pluginsDir + "/" + pluginName);
if (!pluginFolder.exists() || pluginFolder.isFile()) {
@@ -183,4 +251,42 @@ public abstract class InterpreterProcess {
.setInterpreterOut(new InterpreterOutput(null))
.build();
}
+
+ @Override
+ public void shutdown() {
+ isRunning.set(false);
+ }
+
+ @Override
+ public void open() {
+ LOG.error("Please implement the open() method of the child class!");
+ }
+
+ @Override
+ public InterpreterResult interpret(String code) {
+ LOG.error("Please implement the interpret() method of the child class!");
+ return null;
+ }
+
+ @Override
+ public void close() {
+ LOG.error("Please implement the close() method of the child class!");
+ }
+
+ @Override
+ public void cancel() {
+ LOG.error("Please implement the cancel() method of the child class!");
+ }
+
+ @Override
+ public int getProgress() {
+ LOG.error("Please implement the getProgress() method of the child class!");
+ return 0;
+ }
+
+ @Override
+ public boolean test() {
+ LOG.error("Please implement the test() method of the child class!");
+ return false;
+ }
}
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties b/submarine-workbench/interpreter/interpreter-engine/src/main/resources/log4j.properties
similarity index 100%
copy from submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
copy to submarine-workbench/interpreter/interpreter-engine/src/main/resources/log4j.properties
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml b/submarine-workbench/interpreter/python-interpreter/pom.xml
index 2949c21..0ba09a2 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -82,6 +82,10 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-annotations</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -129,6 +133,18 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -151,6 +167,12 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
index 1a19e83..e413684 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
@@ -108,10 +108,11 @@ public class PythonInterpreter extends InterpreterProcess {
open();
String code = "1 + 1";
InterpreterResult result = interpret(code);
- LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}", code, result);
+ LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}",
+ code, result.message().get(0).getData());
- if (result.code() == InterpreterResult.Code.SUCCESS) {
- return true;
+ if (result.code() != InterpreterResult.Code.SUCCESS) {
+ return false;
}
if (StringUtils.equals(result.message().get(0).getData(), "2\n")) {
return true;
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties b/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
index feb1e99..4725615 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
+++ b/submarine-workbench/interpreter/python-interpreter/src/main/resources/log4j.properties
@@ -16,7 +16,7 @@
#
# Root logger option
-log4j.rootLogger=DEBUG, stdout
+log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
diff --git a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
new file mode 100644
index 0000000..247cb09
--- /dev/null
+++ b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.apache.submarine.commons.cluster.ClusterClient;
+import org.apache.submarine.commons.cluster.ClusterServer;
+import org.apache.submarine.commons.utils.NetUtils;
+import org.apache.submarine.commons.utils.SubmarineConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_META;
+import static org.junit.Assert.assertEquals;
+
+public class InterpreterClusterTest {
+ private static Logger LOG = LoggerFactory.getLogger(InterpreterClusterTest.class);
+ private static SubmarineConfiguration sconf;
+
+ private static ClusterServer clusterServer = null;
+ private static ClusterClient clusterClient = null;
+
+ static String serverHost;
+ static int serverPort;
+ static final String clientMetaKey = "InterpreterProcessTest";
+
+ @BeforeClass
+ public static void startCluster() throws IOException, InterruptedException {
+ LOG.info("startCluster >>>");
+
+ sconf = SubmarineConfiguration.create();
+
+ // Set the cluster IP and port
+ serverHost = NetUtils.findAvailableHostAddress();
+ serverPort = NetUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ sconf.setClusterAddress(serverHost + ":" + serverPort);
+
+ // mock cluster manager server
+ clusterServer = ClusterServer.getInstance();
+ clusterServer.start();
+
+ // mock cluster manager client
+ try {
+ Class clazz = ClusterClient.class;
+ Constructor constructor = null;
+ constructor = clazz.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ clusterClient = (ClusterClient) constructor.newInstance();
+ clusterClient.start(clientMetaKey);
+ } catch (NoSuchMethodException | InstantiationException
+ | IllegalAccessException | InvocationTargetException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ // Waiting for cluster startup
+ int wait = 0;
+ while (wait++ < 100) {
+ if (clusterServer.isClusterLeader()
+ && clusterServer.raftInitialized()
+ && clusterClient.raftInitialized()) {
+ LOG.info("wait {}(ms) found cluster leader", wait * 3000);
+ break;
+ }
+ Thread.sleep(3000);
+ }
+ Thread.sleep(3000);
+ assertEquals(true, clusterServer.isClusterLeader()
+ && clusterServer.raftInitialized()
+ && clusterClient.raftInitialized());
+ LOG.info("startCluster <<<");
+ }
+
+ @AfterClass
+ public static void stopCluster() {
+ if (null != clusterClient) {
+ clusterClient.shutdown();
+ }
+ if (null != clusterClient) {
+ clusterServer.shutdown();
+ }
+ LOG.info("stopCluster");
+ }
+
+ @Test
+ public void testInterpreterProcess() throws IOException, InterruptedException {
+ InterpreterProcess interpreterProcess = new InterpreterProcess("python", "testInterpreterProcess", false);
+
+ startInterpreterProcess(interpreterProcess, 5000);
+
+ // Get metadata for all services
+ HashMap<String, HashMap<String, Object>> serverMeta
+ = clusterClient.getClusterMeta(SERVER_META, clusterClient.getClusterNodeName());
+ LOG.info("serverMeta.size = {}", serverMeta.size());
+ assertEquals(serverMeta.size(), 1);
+
+ // get IntpProcess Meta
+ HashMap<String, HashMap<String, Object>> intpMeta
+ = clusterClient.getClusterMeta(INTP_PROCESS_META, "testInterpreterProcess");
+ LOG.info("intpMeta.size = {}", intpMeta.size());
+ assertEquals(intpMeta.size(), 1);
+
+ HashMap<String, HashMap<String, Object>> intpMeta2
+ = clusterClient.getClusterMeta(INTP_PROCESS_META, clientMetaKey);
+ LOG.info("intpMeta2.size = {}", intpMeta2.size());
+ assertEquals(intpMeta2.size(), 1);
+
+ stopInterpreterProcess(interpreterProcess, 5000);
+ }
+
+ private void startInterpreterProcess(InterpreterProcess interpreterProcess, int timeout)
+ throws InterruptedException, IOException {
+ interpreterProcess.start();
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < timeout) {
+ if (interpreterProcess.isRunning()) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+ assertEquals(true, interpreterProcess.isRunning());
+ }
+
+ private void stopInterpreterProcess(InterpreterProcess interpreterProcess, int timeout)
+ throws InterruptedException {
+ interpreterProcess.shutdown();
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < timeout) {
+ if (!interpreterProcess.isRunning()) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+ assertEquals(false, interpreterProcess.isRunning());
+ }
+}
diff --git a/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java b/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
index 01c937f..998d835 100644
--- a/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
+++ b/submarine-workbench/workbench-server/src/main/java/org/apache/submarine/server/WorkbenchServer.java
@@ -20,7 +20,7 @@ package org.apache.submarine.server;
import org.apache.log4j.PropertyConfigurator;
import org.apache.submarine.websocket.NotebookServer;
-import org.apache.submarine.commons.cluster.ClusterManagerServer;
+import org.apache.submarine.commons.cluster.ClusterServer;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -92,8 +92,8 @@ public class WorkbenchServer extends ResourceConfig {
// Notebook server
setupNotebookServer(webApp, conf, sharedServiceLocator);
- // Cluster Manager Server
- setupClusterManagerServer();
+ // Cluster Server
+ setupClusterServer();
startServer();
}
@@ -222,10 +222,10 @@ public class WorkbenchServer extends ResourceConfig {
webapp.addServlet(servletHolder, "/ws/*");
}
- private static void setupClusterManagerServer() {
+ private static void setupClusterServer() {
if (conf.workbenchIsClusterMode()) {
- ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance();
- clusterManagerServer.start();
+ ClusterServer clusterServer = ClusterServer.getInstance();
+ clusterServer.start();
}
}
diff --git a/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
index 7742451..d4c748d 100644
--- a/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
+++ b/submarine-workbench/workbench-server/src/test/java/org/apache/submarine/server/WorkbenchClusterServerTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.submarine.server;
-import org.apache.submarine.commons.cluster.ClusterManagerClient;
+import org.apache.submarine.commons.cluster.ClusterClient;
import org.apache.submarine.commons.cluster.meta.ClusterMetaType;
import org.apache.submarine.commons.utils.NetUtils;
import org.apache.submarine.commons.utils.SubmarineConfiguration;
@@ -39,7 +39,7 @@ import static org.junit.Assert.assertTrue;
public class WorkbenchClusterServerTest {
private static final Logger LOG = LoggerFactory.getLogger(WorkbenchClusterServerTest.class);
- private static ClusterManagerClient clusterClient = null;
+ private static ClusterClient clusterClient = null;
@BeforeClass
public static void start() throws Exception {
@@ -55,11 +55,11 @@ public class WorkbenchClusterServerTest {
AbstractWorkbenchServerTest.startUp(WorkbenchClusterServerTest.class.getSimpleName());
// Mock Cluster client
- Class clazz = ClusterManagerClient.class;
+ Class clazz = ClusterClient.class;
Constructor constructor = null;
constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
- clusterClient = (ClusterManagerClient) constructor.newInstance();
+ clusterClient = (ClusterClient) constructor.newInstance();
clusterClient.start("TestWorkbenchClusterServer");
// Waiting for cluster startup