You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2022/11/10 11:57:23 UTC

[zeppelin] branch master updated: [ZEPPELIN-5845] Cluster polish (#4505)

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new aa4731e699 [ZEPPELIN-5845] Cluster polish (#4505)
aa4731e699 is described below

commit aa4731e69916ed20a3aad56a89544d7a08d57d24
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Thu Nov 10 12:57:13 2022 +0100

    [ZEPPELIN-5845] Cluster polish (#4505)
---
 .../apache/zeppelin/cluster/ClusterManager.java    | 16 +++----
 .../zeppelin/cluster/ClusterManagerServer.java     | 11 ++---
 .../apache/zeppelin/cluster/ClusterMonitor.java    |  4 +-
 .../apache/zeppelin/cluster/meta/ClusterMeta.java  | 49 ++++++++++------------
 .../zeppelin/cluster/meta/ClusterMetaEntity.java   |  7 ++--
 .../zeppelin/cluster/ClusterMultiNodeTest.java     | 19 ++++-----
 .../zeppelin/cluster/ClusterSingleNodeTest.java    | 21 +++++-----
 .../launcher/ClusterInterpreterCheckThread.java    |  6 +--
 .../launcher/ClusterInterpreterLauncher.java       | 12 +++---
 .../launcher/ClusterInterpreterLauncherTest.java   |  3 +-
 .../interpreter/launcher/ClusterMockTest.java      | 18 ++++----
 .../org/apache/zeppelin/rest/ClusterRestApi.java   | 17 ++++----
 .../ClusterIntpSettingEventListenerTest.java       |  2 +-
 .../cluster/ClusterNoteEventListenerTest.java      |  1 -
 14 files changed, 92 insertions(+), 94 deletions(-)

diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
index 38bb832334..6e586afb7f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -344,7 +344,7 @@ public abstract class ClusterManager {
 
     ClusterMetaType metaType = entity.getMetaType();
     String metaKey = entity.getKey();
-    HashMap<String, Object> newMetaValue = entity.getValues();
+    Map<String, Object> newMetaValue = entity.getValues();
 
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("putClusterMeta {} {}", metaType, metaKey);
@@ -361,7 +361,7 @@ public abstract class ClusterManager {
   }
 
   // put metadata into cluster metadata
-  public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) {
+  public void putClusterMeta(ClusterMetaType type, String key, Map<String, Object> values) {
     ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values);
 
     boolean result = putClusterMeta(metaEntity);
@@ -407,9 +407,9 @@ public abstract class ClusterManager {
   }
 
   // get metadata by cluster metadata
-  public HashMap<String, HashMap<String, Object>> getClusterMeta(
+  public Map<String, Map<String, Object>> getClusterMeta(
       ClusterMetaType metaType, String metaKey) {
-    HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
+    Map<String, Map<String, Object>> clusterMeta = new HashMap<>();
     if (!raftInitialized()) {
       LOGGER.error("Raft incomplete initialization!");
       return clusterMeta;
@@ -434,7 +434,7 @@ public abstract class ClusterManager {
     }
 
     if (LOGGER.isDebugEnabled()) {
-      LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString());
+      LOGGER.debug("getClusterMeta >>> {}", clusterMeta);
     }
 
     return clusterMeta;
@@ -442,12 +442,12 @@ public abstract class ClusterManager {
 
   public InterpreterClient getIntpProcessStatus(String intpName,
                                                 int timeout,
-                                                ClusterCallback<HashMap<String, Object>> callback) {
+                                                ClusterCallback<Map<String, Object>> callback) {
     final int CHECK_META_INTERVAL = 1000;
     int MAX_RETRY_GET_META = timeout / CHECK_META_INTERVAL;
     int retryGetMeta = 0;
     while (retryGetMeta++ < MAX_RETRY_GET_META) {
-      HashMap<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, intpName).get(intpName);
+      Map<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, intpName).get(intpName);
       if (interpreterMetaOnline(intpMeta)) {
         // connect exist Interpreter Process
         String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
@@ -485,7 +485,7 @@ public abstract class ClusterManager {
   }
 
   // Check if the interpreter is online
-  private boolean interpreterMetaOnline(HashMap<String, Object> intpProcMeta) {
+  private boolean interpreterMetaOnline(Map<String, Object> intpProcMeta) {
     if (null != intpProcMeta
         && intpProcMeta.containsKey(INTP_TSERVER_HOST)
         && intpProcMeta.containsKey(INTP_TSERVER_PORT)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index 11e56029df..b394b25f25 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -90,6 +90,7 @@ public class ClusterManagerServer extends ClusterManager {
     }
   }
 
+  @Override
   public void start() {
     if (!zConf.isClusterMode()) {
       return;
@@ -262,13 +263,13 @@ public class ClusterManagerServer extends ClusterManager {
   }
 
   // Obtain the server node whose resources are idle in the cluster
-  public HashMap<String, Object> getIdleNodeMeta() {
-    HashMap<String, Object> idleNodeMeta = null;
-    HashMap<String, HashMap<String, Object>> clusterMeta = getClusterMeta(SERVER_META, "");
+  public Map<String, Object> getIdleNodeMeta() {
+    Map<String, Object> idleNodeMeta = null;
+    Map<String, Map<String, Object>> clusterMeta = getClusterMeta(SERVER_META, "");
 
     long memoryIdle = 0;
-    for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
-      HashMap<String, Object> meta = entry.getValue();
+    for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) {
+      Map<String, Object> meta = entry.getValue();
       // Check if the service or process is offline
       String status = (String) meta.get(ClusterMeta.STATUS);
       if (null == status || StringUtils.isEmpty(status)
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
index 2f0b60c4c0..133f8b6910 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
@@ -136,14 +136,14 @@ public class ClusterMonitor {
     LocalDateTime now = LocalDateTime.now();
     // check machine mate
     for (ClusterMetaType metaType : ClusterMetaType.values()) {
-      Map<String, HashMap<String, Object>> clusterMeta
+      Map<String, Map<String, Object>> clusterMeta
           = clusterManager.getClusterMeta(metaType, "");
 
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("clusterMeta : {}", clusterMeta);
       }
 
-      for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
+      for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) {
         String key = entry.getKey();
         Map<String, Object> meta = entry.getValue();
 
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
index 4e8a2767f1..95162bb225 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -16,7 +16,6 @@
  */
 package org.apache.zeppelin.cluster.meta;
 
-import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,36 +28,36 @@ import java.util.Map;
  * Metadata stores metadata information in a KV key-value pair
  */
 public class ClusterMeta implements Serializable {
-  private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterMeta.class);
 
   // The name of each server node in the cluster
-  public static String NODE_NAME            = "NODE_NAME";
+  public static final String NODE_NAME = "NODE_NAME";
 
   // zeppelin-server meta
-  public static String SERVER_HOST          = "SERVER_HOST";
-  public static String SERVER_PORT          = "SERVER_PORT";
-  public static String SERVER_START_TIME    = "SERVER_START_TIME";
+  public static final String SERVER_HOST = "SERVER_HOST";
+  public static final String SERVER_PORT = "SERVER_PORT";
+  public static final String SERVER_START_TIME = "SERVER_START_TIME";
 
   // interperter-process meta
-  public static String INTP_PROCESS_NAME    = "INTP_PROCESS_NAME";
-  public static String INTP_TSERVER_HOST    = "INTP_TSERVER_HOST";
-  public static String INTP_TSERVER_PORT    = "INTP_TSERVER_PORT";
-  public static String INTP_START_TIME      = "INTP_START_TIME";
+  public static final String INTP_PROCESS_NAME = "INTP_PROCESS_NAME";
+  public static final String INTP_TSERVER_HOST = "INTP_TSERVER_HOST";
+  public static final String INTP_TSERVER_PORT = "INTP_TSERVER_PORT";
+  public static final String INTP_START_TIME = "INTP_START_TIME";
 
   // zeppelin-server resource usage
-  public static String CPU_CAPACITY         = "CPU_CAPACITY";
-  public static String CPU_USED             = "CPU_USED";
-  public static String MEMORY_CAPACITY      = "MEMORY_CAPACITY";
-  public static String MEMORY_USED          = "MEMORY_USED";
+  public static final String CPU_CAPACITY = "CPU_CAPACITY";
+  public static final String CPU_USED = "CPU_USED";
+  public static final String MEMORY_CAPACITY = "MEMORY_CAPACITY";
+  public static final String MEMORY_USED = "MEMORY_USED";
 
-  public static String LATEST_HEARTBEAT     = "LATEST_HEARTBEAT";
+  public static final String LATEST_HEARTBEAT = "LATEST_HEARTBEAT";
 
   // zeppelin-server or interperter-process status
-  public static String STATUS               = "STATUS";
-  public static String ONLINE_STATUS        = "ONLINE";
-  public static String OFFLINE_STATUS       = "OFFLINE";
-  public static String INTP_PROCESS_COUNT   = "INTP_PROCESS_COUNT";
-  public static String INTP_PROCESS_LIST    = "INTP_PROCESS_LIST";
+  public static final String STATUS = "STATUS";
+  public static final String ONLINE_STATUS = "ONLINE";
+  public static final String OFFLINE_STATUS = "OFFLINE";
+  public static final String INTP_PROCESS_COUNT = "INTP_PROCESS_COUNT";
+  public static final String INTP_PROCESS_LIST = "INTP_PROCESS_LIST";
 
   // cluster_name = host:port
   // Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
@@ -67,8 +66,6 @@ public class ClusterMeta implements Serializable {
   // Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
   private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>();
 
-  public static Gson gson = new Gson();
-
   public void put(ClusterMetaType type, String key, Object value) {
     Map<String, Object> mapValue = (Map<String, Object>) value;
 
@@ -104,7 +101,7 @@ public class ClusterMeta implements Serializable {
         if (mapServerMeta.containsKey(key)) {
           values = mapServerMeta.get(key);
         } else {
-          logger.warn("can not find key : {}", key);
+          LOGGER.warn("can not find key : {}", key);
         }
         break;
       case INTP_PROCESS_META:
@@ -114,7 +111,7 @@ public class ClusterMeta implements Serializable {
         if (mapInterpreterMeta.containsKey(key)) {
           values = mapInterpreterMeta.get(key);
         } else {
-          logger.warn("can not find key : {}", key);
+          LOGGER.warn("can not find key : {}", key);
         }
         break;
     }
@@ -131,14 +128,14 @@ public class ClusterMeta implements Serializable {
         if (mapServerMeta.containsKey(key)) {
           return mapServerMeta.remove(key);
         } else {
-          logger.warn("can not find key : {}", key);
+          LOGGER.warn("can not find key : {}", key);
         }
         break;
       case INTP_PROCESS_META:
         if (mapInterpreterMeta.containsKey(key)) {
           return mapInterpreterMeta.remove(key);
         } else {
-          logger.warn("can not find key : {}", key);
+          LOGGER.warn("can not find key : {}", key);
         }
         break;
     }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
index 7a5afb013b..fab1333dfe 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMetaEntity.java
@@ -18,6 +18,7 @@ package org.apache.zeppelin.cluster.meta;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Cluster operations, cluster types, encapsulation objects for keys and values
@@ -26,10 +27,10 @@ public class ClusterMetaEntity implements Serializable {
   private ClusterMetaOperation operation;
   private ClusterMetaType type;
   private String key;
-  private HashMap<String, Object> values = new HashMap<>();
+  private Map<String, Object> values = new HashMap<>();
 
   public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type,
-                           String key, HashMap<String, Object> values) {
+                           String key, Map<String, Object> values) {
     this.operation = operation;
     this.type = type;
     this.key = key;
@@ -51,7 +52,7 @@ public class ClusterMetaEntity implements Serializable {
     return key;
   }
 
-  public HashMap<String, Object> getValues() {
+  public Map<String, Object> getValues() {
     return values;
   }
 }
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
index c4e88b0d7e..f5c79421a5 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterMultiNodeTest.java
@@ -21,18 +21,18 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class ClusterMultiNodeTest {
   private static Logger LOGGER = LoggerFactory.getLogger(ClusterMultiNodeTest.class);
@@ -68,10 +68,10 @@ public class ClusterMultiNodeTest {
         String clusterHost = parts[0];
         int clusterPort = Integer.valueOf(parts[1]);
 
-        Class clazz = ClusterManagerServer.class;
-        Constructor constructor = clazz.getDeclaredConstructor();
+        Class<ClusterManagerServer> clazz = ClusterManagerServer.class;
+        Constructor<ClusterManagerServer> constructor = clazz.getDeclaredConstructor();
         constructor.setAccessible(true);
-        ClusterManagerServer clusterServer = (ClusterManagerServer) constructor.newInstance();
+        ClusterManagerServer clusterServer = constructor.newInstance();
         clusterServer.initTestCluster(clusterAddrList, clusterHost, clusterPort);
 
         clusterServers.add(clusterServer);
@@ -145,17 +145,16 @@ public class ClusterMultiNodeTest {
   public static void getClusterServerMeta() {
     LOGGER.info("getClusterServerMeta >>>");
     // Get metadata for all services
-    Object srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+    Map<String, Map<String, Object>> srvMeta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
     LOGGER.info(srvMeta.toString());
 
-    Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+    Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
     LOGGER.info(intpMeta.toString());
 
     assertNotNull(srvMeta);
-    assertEquals(true, (srvMeta instanceof HashMap));
-    HashMap hashMap = (HashMap) srvMeta;
+    assertTrue(srvMeta instanceof Map);
 
-    assertEquals(hashMap.size(), 3);
+    assertEquals(3, srvMeta.size());
     LOGGER.info("getClusterServerMeta <<<");
   }
 }
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
index b6bb92102c..26d7bf9eb6 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterSingleNodeTest.java
@@ -28,9 +28,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class ClusterSingleNodeTest {
   private static Logger LOGGER = LoggerFactory.getLogger(ClusterSingleNodeTest.class);
@@ -98,22 +100,19 @@ public class ClusterSingleNodeTest {
     LOGGER.info("getServerMeta >>>");
 
     // Get metadata for all services
-    Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+    Map<String, Map<String, Object>> meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
     LOGGER.info(meta.toString());
 
-    Object intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
+    Map<String, Map<String, Object>> intpMeta = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
     LOGGER.info(intpMeta.toString());
 
     assertNotNull(meta);
-    assertEquals(true, (meta instanceof HashMap));
-    HashMap hashMap = (HashMap) meta;
+    assertTrue(meta instanceof Map);
 
     // Get metadata for the current service
-    Object values = hashMap.get(clusterClient.getClusterNodeName());
-    assertEquals(true, (values instanceof HashMap));
-    HashMap mapMetaValues = (HashMap) values;
-
-    assertEquals(true, mapMetaValues.size()>0);
+    Map<String, Object> values = meta.get(clusterClient.getClusterNodeName());
+    assertTrue(values instanceof Map);
+    assertTrue(values.size() > 0);
 
     LOGGER.info("getServerMeta <<<");
   }
@@ -121,7 +120,7 @@ public class ClusterSingleNodeTest {
   @Test
   public void putIntpProcessMeta() {
     // mock IntpProcess Meta
-    HashMap<String, Object> meta = new HashMap<>();
+    Map<String, Object> meta = new HashMap<>();
     meta.put(ClusterMeta.SERVER_HOST, zServerHost);
     meta.put(ClusterMeta.SERVER_PORT, zServerPort);
     meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
@@ -135,7 +134,7 @@ public class ClusterSingleNodeTest {
     clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
 
     // get IntpProcess Meta
-    HashMap<String, HashMap<String, Object>> check
+    Map<String, Map<String, Object>> check
         = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
 
     LOGGER.info(check.toString());
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
index ef7bc68e8c..f428bc2346 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
@@ -23,7 +23,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
@@ -54,9 +54,9 @@ public class ClusterInterpreterCheckThread extends Thread {
             ZeppelinConfiguration.create());
 
     clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout,
-        new ClusterCallback<HashMap<String, Object>>() {
+        new ClusterCallback<Map<String, Object>>() {
           @Override
-          public InterpreterClient online(HashMap<String, Object> result) {
+          public InterpreterClient online(Map<String, Object> result) {
             String intpTSrvHost = (String) result.get(INTP_TSERVER_HOST);
             int intpTSrvPort = (int) result.get(INTP_TSERVER_PORT);
             LOGGER.info("Found cluster interpreter {}:{}", intpTSrvHost, intpTSrvPort);
diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 397f6ab515..b34be06230 100644
--- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -61,7 +61,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
 
   @Override
   public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
-    LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+    LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
 
     this.context = context;
     this.properties = context.getProperties();
@@ -70,9 +70,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
 
     // connect exist Interpreter Process
     InterpreterClient intpClient = clusterServer.getIntpProcessStatus(
-        intpGroupId, 3000, new ClusterCallback<HashMap<String, Object>>() {
+        intpGroupId, 3000, new ClusterCallback<Map<String, Object>>() {
           @Override
-          public InterpreterClient online(HashMap<String, Object> result) {
+          public InterpreterClient online(Map<String, Object> result) {
             String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
             int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
 
@@ -100,7 +100,7 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
     // No process was found for the InterpreterGroup ID
     String srvHost = null;
     int srvPort = 0;
-    HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
+    Map<String, Object> meta = clusterServer.getIdleNodeMeta();
     if (null == meta) {
       LOGGER.error("Don't get idle node meta, launch interpreter on local.");
       InterpreterClient clusterIntpProcess = createInterpreterProcess(context);
@@ -145,9 +145,9 @@ public class ClusterInterpreterLauncher extends StandardInterpreterLauncher
     String finalSrvHost = srvHost;
     int finalSrvPort = srvPort;
     intpClient = clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout,
-        new ClusterCallback<HashMap<String, Object>>() {
+        new ClusterCallback<Map<String, Object>>() {
           @Override
-          public InterpreterClient online(HashMap<String, Object> result) {
+          public InterpreterClient online(Map<String, Object> result) {
             // connect exist Interpreter Process
             String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
             int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
index 995da7ecd9..743f800ce9 100644
--- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -33,7 +33,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ClusterInterpreterLauncherTest extends ClusterMockTest {
-  private static Logger LOGGER = LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class);
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class);
 
   @BeforeClass
   public static void startTest() throws IOException, InterruptedException {
diff --git a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index dfacfa0c46..a6be913257 100644
--- a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
+++ b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.LocalDateTime;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
@@ -112,27 +113,26 @@ public class ClusterMockTest {
     LOGGER.info("serverMeta >>>");
 
     // Get metadata for all services
-    Object meta = clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
+    Map<String, Map<String, Object>> meta =
+      clusterClient.getClusterMeta(ClusterMetaType.SERVER_META, "");
 
     LOGGER.info(meta.toString());
 
     assertNotNull(meta);
-    assertEquals(true, (meta instanceof HashMap));
-    HashMap hashMap = (HashMap) meta;
+    assertEquals(true, (meta instanceof Map));
 
     // Get metadata for the current service
-    Object values = hashMap.get(zServerHost + ":" + zServerPort);
-    assertEquals(true, (values instanceof HashMap));
-    HashMap mapMetaValues = (HashMap) values;
+    Map<String, Object> values = meta.get(zServerHost + ":" + zServerPort);
+    assertEquals(true, (values instanceof Map));
 
-    assertEquals(true, mapMetaValues.size() > 0);
+    assertEquals(true, values.size() > 0);
 
     LOGGER.info("serverMeta <<<");
   }
 
   public void mockIntpProcessMeta(String metaKey, boolean online) {
     // mock IntpProcess Meta
-    HashMap<String, Object> meta = new HashMap<>();
+    Map<String, Object> meta = new HashMap<>();
     meta.put(ClusterMeta.SERVER_HOST, "127.0.0.1");
     meta.put(ClusterMeta.SERVER_PORT, 6000);
     meta.put(ClusterMeta.INTP_TSERVER_HOST, "127.0.0.1");
@@ -152,7 +152,7 @@ public class ClusterMockTest {
     clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, meta);
 
     // get IntpProcess Meta
-    HashMap<String, HashMap<String, Object>> check
+    Map<String, Map<String, Object>> check
         = clusterClient.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey);
 
     LOGGER.info(check.toString());
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java
index e0c911a17b..31d936e077 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/ClusterRestApi.java
@@ -39,6 +39,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * clusters Rest api.
@@ -53,7 +54,7 @@ public class ClusterRestApi {
 
 
   // Do not modify, Use by `zeppelin-web/src/app/cluster/cluster.html`
-  private static String PROPERTIES = "properties";
+  private static final String PROPERTIES = "properties";
 
   @Inject
   public ClusterRestApi(ZeppelinConfiguration zConf) {
@@ -85,13 +86,13 @@ public class ClusterRestApi {
   public Response getClusterNodes(){
     List<Map<String, Object>> nodes = new ArrayList<>();
 
-    Map<String, HashMap<String, Object>> clusterMeta;
-    Map<String, HashMap<String, Object>> intpMeta;
+    Map<String, Map<String, Object>> clusterMeta;
+    Map<String, Map<String, Object>> intpMeta;
     clusterMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.SERVER_META, "");
     intpMeta = clusterManagerServer.getClusterMeta(ClusterMetaType.INTP_PROCESS_META, "");
 
     // Number of calculation processes
-    for (Map.Entry<String, HashMap<String, Object>> serverMetaEntity : clusterMeta.entrySet()) {
+    for (Entry<String, Map<String, Object>> serverMetaEntity : clusterMeta.entrySet()) {
       if (!serverMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)) {
         continue;
       }
@@ -99,7 +100,7 @@ public class ClusterRestApi {
 
       List<String> arrIntpProcess = new ArrayList<>();
       int intpProcCount = 0;
-      for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
+      for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
         if (!intpMetaEntity.getValue().containsKey(ClusterMeta.NODE_NAME)
             && !intpMetaEntity.getValue().containsKey(ClusterMeta.INTP_PROCESS_NAME)) {
           continue;
@@ -116,7 +117,7 @@ public class ClusterRestApi {
       serverMetaEntity.getValue().put(ClusterMeta.INTP_PROCESS_LIST, arrIntpProcess);
     }
 
-    for (Map.Entry<String, HashMap<String, Object>> entry : clusterMeta.entrySet()) {
+    for (Map.Entry<String, Map<String, Object>> entry : clusterMeta.entrySet()) {
       String nodeName = entry.getKey();
       Map<String, Object> properties = entry.getValue();
 
@@ -197,11 +198,11 @@ public class ClusterRestApi {
                                  @PathParam("intpName") String intpName){
     List<Map<String, Object>> intpProcesses = new ArrayList<>();
 
-    Map<String, HashMap<String, Object>> intpMeta = clusterManagerServer.getClusterMeta(
+    Map<String, Map<String, Object>> intpMeta = clusterManagerServer.getClusterMeta(
             ClusterMetaType.INTP_PROCESS_META, "");
 
     // Number of calculation processes
-    for (Map.Entry<String, HashMap<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
+    for (Map.Entry<String, Map<String, Object>> intpMetaEntity : intpMeta.entrySet()) {
       String intpNodeName = (String) intpMetaEntity.getValue().get(ClusterMeta.NODE_NAME);
 
       if (null != intpNodeName && intpNodeName.equals(nodeName)) {
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
index 734611f272..79e9a6ee86 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterIntpSettingEventListenerTest.java
@@ -18,7 +18,6 @@ package org.apache.zeppelin.cluster;
 
 import org.apache.zeppelin.cluster.event.ClusterEventListener;
 import org.apache.zeppelin.cluster.event.ClusterMessage;
-import org.apache.zeppelin.user.AuthenticationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,5 +33,6 @@ public class ClusterIntpSettingEventListenerTest implements ClusterEventListener
     receiveMsg = msg;
     LOGGER.info("ClusterIntpSettingEventListenerTest#onClusterEvent : {}", msg);
     ClusterMessage message = ClusterMessage.deserializeMessage(msg);
+    assertNotNull(message);
   }
 }
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
index 0685cf9668..a34e2879d6 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ClusterNoteEventListenerTest.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 public class ClusterNoteEventListenerTest implements ClusterEventListener {