You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2022/11/10 11:57:23 UTC
[zeppelin] branch master updated: [ZEPPELIN-5845] Cluster polish (#4505)
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new aa4731e699 [ZEPPELIN-5845] Cluster polish (#4505)
aa4731e699 is described below
commit aa4731e69916ed20a3aad56a89544d7a08d57d24
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Thu Nov 10 12:57:13 2022 +0100
[ZEPPELIN-5845] Cluster polish (#4505)
---
.../apache/zeppelin/cluster/ClusterManager.java | 16 +++----
.../zeppelin/cluster/ClusterManagerServer.java | 11 ++---
.../apache/zeppelin/cluster/ClusterMonitor.java | 4 +-
.../apache/zeppelin/cluster/meta/ClusterMeta.java | 49 ++++++++++------------
.../zeppelin/cluster/meta/ClusterMetaEntity.java | 7 ++--
.../zeppelin/cluster/ClusterMultiNodeTest.java | 19 ++++-----
.../zeppelin/cluster/ClusterSingleNodeTest.java | 21 +++++-----
.../launcher/ClusterInterpreterCheckThread.java | 6 +--
.../launcher/ClusterInterpreterLauncher.java | 12 +++---
.../launcher/ClusterInterpreterLauncherTest.java | 3 +-
.../interpreter/launcher/ClusterMockTest.java | 18 ++++----
.../org/apache/zeppelin/rest/ClusterRestApi.java | 17 ++++----
.../ClusterIntpSettingEventListenerTest.java | 2 +-
.../cluster/ClusterNoteEventListenerTest.java | 1 -
14 files changed, 92 insertions(+), 94 deletions(-)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
index 38bb832334..6e586afb7f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -344,7 +344,7 @@ public abstract class ClusterManager {
ClusterMetaType metaType = entity.getMetaType();
String metaKey = entity.getKey();
- HashMap<String, Object> newMetaValue = entity.getValues();
+ Map<String, Object> newMetaValue = entity.getValues();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("putClusterMeta {} {}", metaType, metaKey);
@@ -361,7 +361,7 @@ public abstract class ClusterManager {
}
// put metadata into cluster metadata
- public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) {
+ public void putClusterMeta(ClusterMetaType type, String key, Map<String, Object> values) {
ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values);
boolean result = putClusterMeta(metaEntity);
@@ -407,9 +407,9 @@ public abstract class ClusterManager {
}
// get metadata by cluster metadata
- public HashMap<String, HashMap<String, Object>> getClusterMeta(
+ public Map<String, Map<String, Object>> getClusterMeta(
ClusterMetaType metaType, String metaKey) {
- HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
+ Map<String, Map<String, Object>> clusterMeta = new HashMap<>();
if (!raftInitialized()) {
LOGGER.error("Raft incomplete initialization!");
return clusterMeta;
@@ -434,7 +434,7 @@ public abstract class ClusterManager {
}
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString());
+ LOGGER.debug("getClusterMeta >>> {}", clusterMeta);
}
return clusterMeta;
@@ -442,12 +442,12 @@ public abstract class ClusterManager {
public InterpreterClient getIntpProcessStatus(String intpName,
int timeout,
- ClusterCallback<HashMap<String, Object>> callback) {
+ ClusterCallback<Map<String, Object>> callback) {
final int CHECK_META_INTERVAL = 1000;
int MAX_RETRY_GET_META = timeout / CHECK_META_INTERVAL;
int retryGetMeta = 0;
while (retryGetMeta++ < MAX_RETRY_GET_META) {
- HashMap<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, intpName).get(intpName);
+ Map<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, intpName).get(intpName);
if (interpreterMetaOnline(intpMeta)) {
// connect exist Interpreter Process
String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
@@ -485,7 +485,7 @@ public abstract class ClusterManager {
}
// Check if the interpreter is online
- private boolean interpreterMetaOnline(HashMap<String, Object> intpProcMeta) {
+ private boolean interpreterMetaOnline(Map<String, Object> intpProcMeta) {
if (null != intpProcMeta
&& intpProcMeta.containsKey(INTP_TSERVER_HOST)
&& intpProcMeta.containsKey(INTP_TSERVER_PORT)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index 11e56029df..b394b25f25 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -90,6 +90,7 @@ public class ClusterManagerServer extends ClusterManager {
}
}
+ @Override
public void start() {
if (!zConf.isClusterMode()) {
return;
@@ -262,13 +263,13 @@ public class ClusterManagerServer extends ClusterManager {
}
// Obtain the server node whose resources are idle in the cluster
- public HashMap<String, Object> getIdleNodeMeta() {
- HashMap<String, Object> idleNodeMeta = null;
- HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(SERVER_META, "");
+ public Map<String, Object> getIdleNodeMeta() {
+ Map<String, Object> idleNodeMeta = null;
+ Map<String, Map<String, Object>> clusterMeta = getClusterMeta(SERVER_META, "");
long memoryIdle = 0;
- for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
- HashMap<String, Object> meta = entry.getValue();
+ for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) {
+ Map<String, Object> meta = entry.getValue();
// Check if the service or process is offline
String status = (String) meta.get(ClusterMeta.STATUS);
if (null == status || StringUtils.isEmpty(status)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
index 2f0b60c4c0..133f8b6910 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
@@ -136,14 +136,14 @@ public class ClusterMonitor {
LocalDateTime now = LocalDateTime.now();
// check machine mate
for (ClusterMetaType metaType : ClusterMetaType.values()) {
- Map<String, HashMap<String, Object>> clusterMeta
+ Map<String, Map<String, Object>> clusterMeta
= clusterManager.getClusterMeta(metaType, "");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("clusterMeta : {}", clusterMeta);
}
- for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
+ for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) {
String key = entry.getKey();
Map<String, Object> meta = entry.getValue();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
index 4e8a2767f1..95162bb225 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -16,7 +16,6 @@
*/
package org.apache.zeppelin.cluster.meta;
-import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,36 +28,36 @@ import java.util.Map;
* Metadata stores metadata information in a KV key-value pair
*/
public class ClusterMeta implements Serializable {
- private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusterMeta.class);
// The name of each server node in the cluster
- public static String NODE_NAME = "NODE_NAME";
+ public static final String NODE_NAME = "NODE_NAME";
// zeppelin-server meta
- public static String SERVER_HOST = "SERVER_HOST";
- public static String SERVER_PORT = "SERVER_PORT";
- public static String SERVER_START_TIME = "SERVER_START_TIME";
+ public static final String SERVER_HOST = "SERVER_HOST";
+ public static final String SERVER_PORT = "SERVER_PORT";
+ public static final String SERVER_START_TIME = "SERVER_START_TIME";
// interperter-process meta
- public static String INTP_PROCESS_NAME = "INTP_PROCESS_NAME";
- public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST";
- public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT";
- public static String INTP_START_TIME = "INTP_START_TIME";
+ public static final String INTP_PROCESS_NAME = "INTP_PROCESS_NAME";
+ public static final String INTP_TSERVER_HOST = "INTP_TSERVER_HOST";
+ public static final String INTP_TSERVER_PORT = "INTP_TSERVER_PORT";
+ public static final String INTP_START_TIME = "INTP_START_TIME";
// zeppelin-server resource usage
- public static String CPU_CAPACITY = "CPU_CAPACITY";
- public static String CPU_USED = "CPU_USED";
- public static String MEMORY_CAPACITY = "MEMORY_CAPACITY";
- public static String MEMORY_USED = "MEMORY_USED";
+ public static final String CPU_CAPACITY = "CPU_CAPACITY";
+ public static final String CPU_USED = "CPU_USED";
+ public static final String MEMORY_CAPACITY = "MEMORY_CAPACITY";
+ public static final String MEMORY_USED = "MEMORY_USED";
- public static String LATEST_HEARTBEAT = "LATEST_HEARTBEAT";
+ public static final String LATEST_HEARTBEAT = "LATEST_HEARTBEAT";
// zeppelin-server or interperter-process status
- public static String STATUS = "STATUS";
- public static String ONLINE_STATUS = "ONLINE";
- public static String OFFLINE_STATUS = "OFFLINE";
- public static String INTP_PROCESS_COUNT = "INTP_PROCESS_COUNT";
- public static String INTP_PROCESS_LIST = "INTP_PROCESS_LIST";
+ public static final String STATUS = "STATUS";
+ public static final String ONLINE_STATUS = "ONLINE";
+ public static final String OFFLINE_STATUS = "OFFLINE";
+ public static final String INTP_PROCESS_COUNT = "INTP_PROCESS_COUNT";
+ public static final String INTP_PROCESS_LIST = "INTP_PROCESS_LIST";
// cluster_name = host:port
// Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
@@ -67,8 +66,6 @@ public class ClusterMeta implements Serializable {
// Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>();
- public static Gson gson = new Gson();
-
public void put(ClusterMetaType type, String key, Object value) {
Map<String, Object> mapValue = (Map<String, Object>) value;
@@ -104,7 +101,7 @@ public class ClusterMeta implements Serializable {
if (mapServerMeta.containsKey(key)) {
values = mapServerMeta.get(key);
} else {
- logger.warn("can not find key : {}", key);
+ LOGGER.warn("can not find key : {}", key);
}
break;
case INTP_PROCESS_META:
@@ -114,7 +111,7 @@ public class ClusterMeta implements Serializable {
if (mapInterpreterMeta.containsKey(key)) {
values = mapInterpreterMeta.get(key);
} else {
- logger.warn("can not find key : {}", key);
+ LOGGER.warn("can not find key : {}", key);
}
break;
}
@@ -131,14 +128,14 @@ public class ClusterMeta implements Serializable {
if (mapServerMeta.containsKey(key)) {
return mapServerMeta.remove(key);
} else {
- logger.warn("can not find key : {}", key);
+ LOGGER.warn("can not find key : {}", key);
}
break;
case INTP_PROCESS_META:
if (mapInterpreterMeta.containsKey(key)) {
return mapInterpreterMeta.remove(key);
} else {
- logger.warn("can not find key : {}", key);
+ LOGGER.warn("can not find key : {}", key);
}
break;
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
index 7a5afb013b..fab1333dfe 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
@@ -18,6 +18,7 @@ package org.apache.zeppelin.cluster.meta;
import java.io.Serializable;
import java.util.HashMap;
+import java.util.Map;
/**
* Cluster operations, cluster types, encapsulation objects for keys and values
@@ -26,10 +27,10 @@ public class ClusterMetaEntity implements Serializable {
private ClusterMetaOperation operation;
private ClusterMetaType type;
private String key;
- private HashMap<String, Object> values = new HashMap<>();
+ private Map<String, Object> values = new HashMap<>();
public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type,
- String key, HashMap<String, Object> values) {
+ String key, Map<String, Object> values) {
this.operation = operation;
this.type = type;
this.key = key;
@@ -51,7 +52,7 @@ public class ClusterMetaEntity implements Serializable {
return key;
}
- public HashMap<String, Object> getValues() {
+ public Map<String, Object> getValues() {
return values;
}
}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
index c4e88b0d7e..f5c79421a5 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
@@ -21,18 +21,18 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class ClusterMultiNodeTest {
private static Logger LOGGER = LoggerFactory.getLogger(ClusterMultiNodeTest.class);
@@ -68,10 +68,10 @@ public class ClusterMultiNodeTest {
String clusterHost = parts[0];
int clusterPort = Integer.valueOf(parts[1]);
- Class clazz = ClusterManagerServer.class;
- Constructor constructor = clazz.getDeclaredConstructor();
+ Class<ClusterManagerServer> clazz = ClusterManagerServer.class;
+ Constructor<ClusterManagerServer> constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
- ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+ ClusterManagerServer clusterServer = constructor.newInstance();
clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
clusterServers.add(clusterServer);
@@ -145,17 +145,16 @@ public class ClusterMultiNodeTest {
public static void getClusterServerMeta() {
LOGGER.info("getClusterServerMeta >>>");
// Get metadata for all services
- Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+ Map<String, Map<String, Object>> srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
LOGGER.info(srvMeta.toString());
- Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+ Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
LOGGER.info(intpMeta.toString());
assertNotNull(srvMeta);
- assertEquals(true, (srvMeta instanceof HashMap));
- HashMap hashMap = (HashMap) srvMeta;
+ assertTrue(srvMeta instanceof Map);
- assertEquals(hashMap.size(), 3);
+ assertEquals(3, srvMeta.size());
LOGGER.info("getClusterServerMeta <<<");
}
}
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
index b6bb92102c..26d7bf9eb6 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class ClusterSingleNodeTest {
private static Logger LOGGER = LoggerFactory.getLogger(ClusterSingleNodeTest.class);
@@ -98,22 +100,19 @@ public class ClusterSingleNodeTest {
LOGGER.info("getServerMeta >>>");
// Get metadata for all services
- Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+ Map<String, Map<String, Object>> meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
LOGGER.info(meta.toString());
- Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+ Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
LOGGER.info(intpMeta.toString());
assertNotNull(meta);
- assertEquals(true, (meta instanceof HashMap));
- HashMap hashMap = (HashMap) meta;
+ assertTrue(meta instanceof Map);
// Get metadata for the current service
- Object values = hashMap.get(clusterClient.getClusterNodeName());
- assertEquals(true, (values instanceof HashMap));
- HashMap mapMetaValues = (HashMap) values;
-
- assertEquals(true, mapMetaValues.size()>0);
+ Map<String, Object> values = meta.get(clusterClient.getClusterNodeName());
+ assertTrue(values instanceof Map);
+ assertTrue(values.size() > 0);
LOGGER.info("getServerMeta <<<");
}
@@ -121,7 +120,7 @@ public class ClusterSingleNodeTest {
@Test
public void putIntpProcessMeta() {
// mock IntpProcess Meta
- HashMap<String, Object> meta = new HashMap<>();
+ Map<String, Object> meta = new HashMap<>();
meta.put(ClusterMeta.SERVER_HOST, zServerHost);
meta.put(ClusterMeta.SERVER_PORT, zServerPort);
meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
@@ -135,7 +134,7 @@ public class ClusterSingleNodeTest {
clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
// get IntpProcess Meta
- HashMap<String, HashMap<String, Object>> check
+ Map<String, Map<String, Object>> check
= clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
LOGGER.info(check.toString());
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
index ef7bc68e8c..f428bc2346 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
@@ -23,7 +23,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
+import java.util.Map;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
@@ -54,9 +54,9 @@ public class ClusterInterpreterCheckThread extends Thread {
ZeppelinConfiguration.create());
clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout,
- new ClusterCallback<HashMap<String, Object>>() {
+ new ClusterCallback<Map<String, Object>>() {
@Override
- public InterpreterClient online(HashMap<String, Object> result) {
+ public InterpreterClient online(Map<String, Object> result) {
String intpTSrvHost = (String) result.get(INTP_TSERVER_HOST);
int intpTSrvPort = (int) result.get(INTP_TSERVER_PORT);
LOGGER.info("Found cluster interpreter {}:{}", intpTSrvHost, intpTSrvPort);
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 397f6ab515..b34be06230 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -61,7 +61,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
@Override
public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
- LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+ LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
this.context = context;
this.properties = context.getProperties();
@@ -70,9 +70,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
// connect exist Interpreter Process
InterpreterClient intpClient = clusterServer.getIntpProcessStatus(
- intpGroupId, 3000, new ClusterCallback<HashMap<String, Object>>() {
+ intpGroupId, 3000, new ClusterCallback<Map<String, Object>>() {
@Override
- public InterpreterClient online(HashMap<String, Object> result) {
+ public InterpreterClient online(Map<String, Object> result) {
String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
@@ -100,7 +100,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
// No process was found for the InterpreterGroup ID
String srvHost = null;
int srvPort = 0;
- HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
+ Map<String, Object> meta = clusterServer.getIdleNodeMeta();
if (null == meta) {
LOGGER.error("Don't get idle node meta, launch interpreter on local.");
InterpreterClient clusterIntpProcess = createInterpreterProcess(context);
@@ -145,9 +145,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
String finalSrvHost = srvHost;
int finalSrvPort = srvPort;
intpClient = clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout,
- new ClusterCallback<HashMap<String, Object>>() {
+ new ClusterCallback<Map<String, Object>>() {
@Override
- public InterpreterClient online(HashMap<String, Object> result) {
+ public InterpreterClient online(Map<String, Object> result) {
// connect exist Interpreter Process
String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
index 995da7ecd9..743f800ce9 100644
--- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -33,7 +33,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ClusterInterpreterLauncherTest extends ClusterMockTest {
- private static Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class);
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class);
@BeforeClass
public static void startTest() throws IOException, InterruptedException {
diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index dfacfa0c46..a6be913257 100644
--- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashMap;
+import java.util.Map;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
@@ -112,27 +113,26 @@ public class ClusterMockTest {
LOGGER.info("serverMeta >>>");
// Get metadata for all services
- Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+ Map<String, Map<String, Object>> meta =
+ clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
LOGGER.info(meta.toString());
assertNotNull(meta);
- assertEquals(true, (meta instanceof HashMap));
- HashMap hashMap = (HashMap) meta;
+ assertEquals(true, (meta instanceof Map));
// Get metadata for the current service
- Object values = hashMap.get(zServerHost + ":" + zServerPort);
- assertEquals(true, (values instanceof HashMap));
- HashMap mapMetaValues = (HashMap) values;
+ Map<String, Object> values = meta.get(zServerHost + ":" + zServerPort);
+ assertEquals(true, (values instanceof Map));
- assertEquals(true, mapMetaValues.size() > 0);
+ assertEquals(true, values.size() > 0);
LOGGER.info("serverMeta <<<");
}
public void mockIntpProcessMeta(String metaKey, boolean online) {
// mock IntpProcess Meta
- HashMap<String, Object> meta = new HashMap<>();
+ Map<String, Object> meta = new HashMap<>();
meta.put(ClusterMeta.SERVER_HOST, "127.0.0.1");
meta.put(ClusterMeta.SERVER_PORT, 6000);
meta.put(ClusterMeta.INTP_TSERVER_HOST, "127.0.0.1");
@@ -152,7 +152,7 @@ public class ClusterMockTest {
clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
// get IntpProcess Meta
- HashMap<String, HashMap<String, Object>> check
+ Map<String, Map<String, Object>> check
= clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
LOGGER.info(check.toString());
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java
index e0c911a17b..31d936e077 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
/**
* clusters Rest api.
@@ -53,7 +54,7 @@ public class ClusterRestApi {
// Do not modify, Use by `zeppelin-web/src/app/cluster/cluster.html`
- private static String PROPERTIES = "properties";
+ private static final String PROPERTIES = "properties";
@Inject
public ClusterRestApi(ZeppelinConfiguration zConf) {
@@ -85,13 +86,13 @@ public class ClusterRestApi {
public Response getClusterNodes(){
List<Map<String, Object>> nodes = new ArrayList<>();
- Map<String, HashMap<String, Object>> clusterMeta;
- Map<String, HashMap<String, Object>> intpMeta;
+ Map<String, Map<String, Object>> clusterMeta;
+ Map<String, Map<String, Object>> intpMeta;
clusterMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.SERVER_META, "");
intpMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
// Number of calculation processes
- for (Map.Entry<String, HashMap<String, Object>> serverMetaEntity : clusterMeta.entrySet()) {
+ for (Entry<String, Map<String, Object>> serverMetaEntity : clusterMeta.entrySet()) {
if (!serverMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)) {
continue;
}
@@ -99,7 +100,7 @@ public class ClusterRestApi {
List<String> arrIntpProcess = new ArrayList<>();
int intpProcCount = 0;
- for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
+ for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
if (!intpMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)
&& !intpMetaEntity.getValue().containsKey(ClusterMeta.INTP_PROCESS_NAME)) {
continue;
@@ -116,7 +117,7 @@ public class ClusterRestApi {
serverMetaEntity.getValue().put(ClusterMeta.INTP_PROCESS_LIST, arrIntpProcess);
}
- for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
+ for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) {
String nodeName = entry.getKey();
Map<String, Object> properties = entry.getValue();
@@ -197,11 +198,11 @@ public class ClusterRestApi {
@PathParam("intpName") String intpName){
List<Map<String, Object>> intpProcesses = new ArrayList<>();
- Map<String, HashMap<String, Object>> intpMeta = clusterManagerServer.getClusterMeta(
+ Map<String, Map<String, Object>> intpMeta = clusterManagerServer.getClusterMeta(
ClusterMetaType.INTP_PROCESS_META, "");
// Number of calculation processes
- for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
+ for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
String intpNodeName = (String) intpMetaEntity.getValue().get(ClusterMeta.NODE_NAME);
if (null != intpNodeName && intpNodeName.equals(nodeName)) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
index 734611f272..79e9a6ee86 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
@@ -18,7 +18,6 @@ package org.apache.zeppelin.cluster;
import org.apache.zeppelin.cluster.event.ClusterEventListener;
import org.apache.zeppelin.cluster.event.ClusterMessage;
-import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,5 +33,6 @@ public class ClusterIntpSettingEventListenerTest implements ClusterEventListener
receiveMsg = msg;
LOGGER.info("ClusterIntpSettingEventListenerTest#onClusterEvent : {}", msg);
ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+ assertNotNull(message);
}
}
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
index 0685cf9668..a34e2879d6 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Set;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class ClusterNoteEventListenerTest implements ClusterEventListener {