You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2023/01/16 08:06:13 UTC

[zeppelin] branch master updated: [ZEPPELIN-5874] polish flink code base (#4553)

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new fa2a132dec [ZEPPELIN-5874] polish flink code base (#4553)
fa2a132dec is described below

commit fa2a132dec28bb1350b8d95ea005b210081b0e70
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Mon Jan 16 09:06:07 2023 +0100

    [ZEPPELIN-5874] polish flink code base (#4553)
---
 .../flink/ApplicationModeExecutionEnvironment.java |  1 -
 .../apache/zeppelin/flink/FlinkInterpreter.java    |  6 ++--
 .../apache/zeppelin/flink/FlinkSqlInterpreter.java |  2 +-
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  4 +--
 .../java/org/apache/zeppelin/flink/JobManager.java | 34 +++++++++++-----------
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |  2 +-
 .../org/apache/zeppelin/flink/TableEnvFactory.java | 24 +++++++--------
 .../apache/zeppelin/flink/internal/JarHelper.java  | 12 +++-----
 .../flink/internal/ScalaShellEnvironment.java      |  3 --
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java   | 16 +++++-----
 .../zeppelin/flink/sql/AppendStreamSqlJob.java     |  7 ++---
 .../zeppelin/flink/sql/SingleRowStreamSqlJob.java  | 10 +++----
 .../zeppelin/flink/sql/UpdateStreamSqlJob.java     | 12 ++++----
 .../apache/zeppelin/spark/PySparkInterpreter.java  |  1 +
 .../apache/zeppelin/spark/SparkInterpreter.java    |  9 +++---
 .../interpreter/SingleRowInterpreterResult.java    |  4 +--
 .../SingleRowInterpreterResultTest.java            |  9 +++---
 17 files changed, 75 insertions(+), 81 deletions(-)

diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
index 0a96734e86..52ba6fe033 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.zeppelin.flink.internal.FlinkILoop;
 
 import java.io.File;
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 63c69a004d..a10128f94c 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -53,7 +53,7 @@ public class FlinkInterpreter extends Interpreter {
 
   private String extractScalaVersion() throws InterpreterException {
     String scalaVersionString = scala.util.Properties.versionString();
-    LOGGER.info("Using Scala: " + scalaVersionString);
+    LOGGER.info("Using Scala: {}", scalaVersionString);
     if (scalaVersionString.contains("version 2.11")) {
       return "2.11";
     } else if (scalaVersionString.contains("version 2.12")) {
@@ -87,7 +87,7 @@ public class FlinkInterpreter extends Interpreter {
     String scalaVersion = extractScalaVersion();
     ClassLoader flinkScalaClassLoader = FlinkScalaInterpreter.class.getClassLoader();
     String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
-    Class clazz = Class.forName(innerIntpClassName);
+    Class<?> clazz = Class.forName(innerIntpClassName);
 
     return (FlinkScalaInterpreter)
             clazz.getConstructor(Properties.class, URLClassLoader.class)
@@ -104,7 +104,7 @@ public class FlinkInterpreter extends Interpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
-    LOGGER.debug("Interpret code: " + st);
+    LOGGER.debug("Interpret code: {}", st);
     this.z.setInterpreterContext(context);
     this.z.setGui(context.getGui());
     this.z.setNoteGui(context.getNoteGui());
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
index 8c55765a86..1145da4dda 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
@@ -46,7 +46,7 @@ public abstract class FlinkSqlInterpreter extends AbstractInterpreter {
 
   @Override
   protected InterpreterResult internalInterpret(String st, InterpreterContext context) throws InterpreterException {
-    LOGGER.debug("Interpret code: " + st);
+    LOGGER.debug("Interpret code: {}", st);
     // set ClassLoader of current Thread to be the ClassLoader of Flink scala-shell,
     // otherwise codegen will fail to find classes defined in scala-shell
     ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 817d13fa3e..1bc61821f8 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -101,7 +101,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
         InterpreterResult result =
                 super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
         if (result.code() != InterpreterResult.Code.SUCCESS) {
-          LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+          LOGGER.warn("Fail to resetClassLoaderInPythonThread: {}", result);
         }
       }
     }
@@ -112,7 +112,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
     flinkInterpreter.cancel(context);
     super.cancel(context);
   }
-  
+
   /**
    * Called by python process.
    */
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 33a75dac40..61fdf4db80 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class JobManager {
 
-  private static Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
   public static final String LATEST_CHECKPOINT_PATH = "latest_checkpoint_path";
   public static final String SAVEPOINT_PATH = "savepoint_path";
   public static final String RESUME_FROM_SAVEPOINT = "resumeFromSavepoint";
@@ -62,7 +62,7 @@ public class JobManager {
     LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}",
             flinkWebUrl, displayedFlinkWebUrl);
   }
-  
+
   public void addJob(InterpreterContext context, JobClient jobClient) {
     String paragraphId = context.getParagraphId();
     JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
@@ -83,18 +83,18 @@ public class JobManager {
   }
 
   public void removeJob(String paragraphId) {
-    LOGGER.info("Remove job in paragraph: " + paragraphId);
+    LOGGER.info("Remove job in paragraph: {}", paragraphId);
     JobClient jobClient = this.jobs.remove(paragraphId);
     if (jobClient == null) {
-      LOGGER.warn("Unable to remove job, because no job is associated with paragraph: "
-              + paragraphId);
+      LOGGER.warn("Unable to remove job, because no job is associated with paragraph: {}",
+        paragraphId);
       return;
     }
     FlinkJobProgressPoller jobProgressPoller =
             this.jobProgressPollerMap.remove(jobClient.getJobID());
     if (jobProgressPoller == null) {
-        LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: "
-                + paragraphId);
+      LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: {}",
+        paragraphId);
         return;
     }
 
@@ -114,21 +114,21 @@ public class JobManager {
       infos.put("paraId", context.getParagraphId());
       context.getIntpEventClient().onParaInfosReceived(infos);
     } else {
-      LOGGER.warn("No job is associated with paragraph: " + context.getParagraphId());
+      LOGGER.warn("No job is associated with paragraph: {}", context.getParagraphId());
     }
   }
 
   public int getJobProgress(String paragraphId) {
     JobClient jobClient = this.jobs.get(paragraphId);
     if (jobClient == null) {
-      LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
-              ", because no job is associated with this paragraph");
+      LOGGER.warn("Unable to get job progress for paragraph: {}"
+        + ", because no job is associated with this paragraph", paragraphId);
       return 0;
     }
     FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.get(jobClient.getJobID());
     if (jobProgressPoller == null) {
-      LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
-              ", because no job progress is associated with this jobId: " + jobClient.getJobID());
+      LOGGER.warn("Unable to get job progress for paragraph: {}"
+        + ", because no job progress is associated with this jobId: {}", paragraphId, jobClient.getJobID());
       return 0;
     }
     return jobProgressPoller.getProgress();
@@ -174,8 +174,8 @@ public class JobManager {
       throw new InterpreterException(errorMessage, e);
     } finally {
       if (cancelled) {
-        LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: "
-                + context.getParagraphId());
+        LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: {}",
+          context.getParagraphId());
         FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobClient.getJobID());
         if (jobProgressPoller != null) {
           jobProgressPoller.cancel();
@@ -231,11 +231,11 @@ public class JobManager {
             totalTasks += vertex.getInt("parallelism");
             finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED");
           }
-          LOGGER.debug("Total tasks:" + totalTasks);
-          LOGGER.debug("Finished tasks:" + finishedTasks);
+          LOGGER.debug("Total tasks:{}", totalTasks);
+          LOGGER.debug("Finished tasks:{}", finishedTasks);
           if (finishedTasks != 0) {
             this.progress = finishedTasks * 100 / totalTasks;
-            LOGGER.debug("Progress: " + this.progress);
+            LOGGER.debug("Progress: {}", this.progress);
           }
           String jobState = rootNode.getObject().getString("state");
           if (jobState.equalsIgnoreCase("finished")) {
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 3a33cd7c4b..df203b71b8 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -123,7 +123,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
       if (useIPython() || (!useIPython() && getPythonProcessLauncher().isRunning())) {
         InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
         if (result.code() != InterpreterResult.Code.SUCCESS) {
-          LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+          LOGGER.warn("Fail to resetClassLoaderInPythonThread: {}", result);
         }
       }
     }
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 711993578d..5ec2de96eb 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -44,7 +44,7 @@ import java.lang.reflect.Constructor;
  */
 public class TableEnvFactory {
 
-  private static Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class);
 
   private FlinkVersion flinkVersion;
   private FlinkShims flinkShims;
@@ -99,10 +99,10 @@ public class TableEnvFactory {
 
   public TableEnvironment createScalaFlinkBatchTableEnvironment() {
     try {
-      Class clazz = Class
+      Class<?> clazz = Class
                 .forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl");
 
-      Constructor constructor = clazz
+      Constructor<?> constructor = clazz
               .getConstructor(
                       org.apache.flink.api.scala.ExecutionEnvironment.class,
                       TableConfig.class,
@@ -121,7 +121,7 @@ public class TableEnvFactory {
       Class<?> clazz = Class
                 .forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl");
 
-      Constructor con = clazz.getConstructor(
+      Constructor<?> con = clazz.getConstructor(
               ExecutionEnvironment.class,
               TableConfig.class,
               CatalogManager.class,
@@ -146,10 +146,10 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = Class
+      Class<?> clazz = Class
                 .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
       try {
-        Constructor constructor = clazz
+        Constructor<?> constructor = clazz
                 .getConstructor(
                         CatalogManager.class,
                         ModuleManager.class,
@@ -169,7 +169,7 @@ public class TableEnvFactory {
                 settings.isStreamingMode());
       } catch (NoSuchMethodException e) {
         // Flink 1.11.1 change the constructor signature, FLINK-18419
-        Constructor constructor = clazz
+        Constructor<?> constructor = clazz
                 .getConstructor(
                         CatalogManager.class,
                         ModuleManager.class,
@@ -203,11 +203,11 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = Class
+      Class<?> clazz = Class
                 .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
 
       try {
-        Constructor constructor = clazz
+        Constructor<?> constructor = clazz
                 .getConstructor(
                         CatalogManager.class,
                         ModuleManager.class,
@@ -262,10 +262,10 @@ public class TableEnvFactory {
       Planner planner = (Planner) pair.left;
       Executor executor = (Executor) pair.right;
 
-      Class clazz = Class
+      Class<?> clazz = Class
                 .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
       try {
-        Constructor constructor = clazz.getConstructor(
+        Constructor<?> constructor = clazz.getConstructor(
                 CatalogManager.class,
                 ModuleManager.class,
                 FunctionCatalog.class,
@@ -285,7 +285,7 @@ public class TableEnvFactory {
                 settings.isStreamingMode());
       } catch (NoSuchMethodException e) {
         // Flink 1.11.1 change the constructor signature, FLINK-18419
-        Constructor constructor = clazz.getConstructor(
+        Constructor<?> constructor = clazz.getConstructor(
                 CatalogManager.class,
                 ModuleManager.class,
                 FunctionCatalog.class,
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
index 07b5033238..648fe51e0f 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
@@ -72,16 +72,13 @@ public class JarHelper {
     }
 
     mDestJarName = destJar.getCanonicalPath();
-    FileOutputStream fout = new FileOutputStream(destJar);
-    JarOutputStream jout = new JarOutputStream(fout);
-    // jout.setLevel(0);
-    try {
+    try (
+      FileOutputStream fout = new FileOutputStream(destJar);
+      JarOutputStream jout = new JarOutputStream(fout)) {
+      // jout.setLevel(0);
       jarDir(dirOrFile2Jar, jout, null);
     } catch (IOException ioe) {
       throw ioe;
-    } finally {
-      jout.close();
-      fout.close();
     }
   }
 
@@ -89,7 +86,6 @@ public class JarHelper {
    * Unjars a given jar file into a given directory.
    */
   public void unjarDir(File jarFile, File destDir) throws IOException {
-    BufferedOutputStream dest = null;
     FileInputStream fis = new FileInputStream(jarFile);
     unjar(fis, destDir);
   }
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
index ebd9f1d2fb..c8288b96f4 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
@@ -19,11 +19,9 @@
 package org.apache.zeppelin.flink.internal;
 
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.JarUtils;
@@ -34,7 +32,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This class is copied from flink project, the reason is that flink scala shell only supports
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 1ee26143cc..e1e1642cbd 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -117,7 +117,7 @@ public abstract class AbstractStreamSqlJob {
       this.schema = removeTimeAttributes(flinkShims, table.getSchema());
       checkTableSchema(schema);
 
-      LOGGER.info("ResultTable Schema: " + this.schema);
+      LOGGER.info("ResultTable Schema: {}", this.schema);
       final RowTypeInfo outputType = new RowTypeInfo(schema.getFieldTypes(),
               schema.getFieldNames());
 
@@ -132,8 +132,8 @@ public abstract class AbstractStreamSqlJob {
               serializer);
       // create table sink
       // pass binding address and port such that sink knows where to send to
-      LOGGER.debug("Collecting data at address: " + iterator.getBindAddress() +
-              ":" + iterator.getPort());
+      LOGGER.debug("Collecting data at address: {}:{}",
+        iterator.getBindAddress(), iterator.getPort());
       RetractStreamTableSink collectTableSink =
               (RetractStreamTableSink) flinkShims.getCollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
              // new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
@@ -149,16 +149,16 @@ public abstract class AbstractStreamSqlJob {
       ResultRetrievalThread retrievalThread = new ResultRetrievalThread(refreshScheduler);
       retrievalThread.start();
 
-      LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism);
+      LOGGER.info("Run job: {}, parallelism: {}", tableName, parallelism);
       String jobName = context.getStringLocalProperty("jobName", tableName);
       table.executeInsert(tableName).await();
-      LOGGER.info("Flink Job is finished, jobName: " + jobName);
+      LOGGER.info("Flink Job is finished, jobName: {}", jobName);
       // wait for retrieve thread consume all data
       LOGGER.info("Waiting for retrieve thread to be done");
       retrievalThread.join();
       refresh(context);
       String finalResult = buildResult();
-      LOGGER.info("Final Result: " + finalResult);
+      LOGGER.info("Final Result: {}", finalResult);
       return finalResult;
     } catch (Exception e) {
       LOGGER.error("Fail to run stream sql job", e);
@@ -229,7 +229,7 @@ public abstract class AbstractStreamSqlJob {
       isRunning = false;
       LOGGER.info("ResultRetrieval Thread is done, isRunning={}, hasNext={}",
               isRunning, iterator.hasNext());
-      LOGGER.info("Final Result: " + buildResult());
+      LOGGER.info("Final Result: {}", buildResult());
       refreshExecutorService.shutdownNow();
     }
 
@@ -255,7 +255,7 @@ public abstract class AbstractStreamSqlJob {
           if (!enableToRefresh) {
             resultLock.wait();
           }
-          LOGGER.debug("Refresh result of paragraph: " + context.getParagraphId());
+          LOGGER.debug("Refresh result of paragraph: {}", context.getParagraphId());
           refresh(context);
         }
       } catch (Exception e) {
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
index c6791ea6d3..705f9979b8 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Timestamp;
-import java.time.temporal.TemporalField;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -70,7 +69,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
 
   @Override
   protected void processInsert(Row row) {
-    LOGGER.debug("processInsert: " + row.toString());
+    LOGGER.debug("processInsert: {}", row.toString());
     materializedTable.add(row);
   }
 
@@ -99,7 +98,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
       return f1.compareTo(f2);
     });
 
-    if (materializedTable.size() != 0) {
+    if (!materializedTable.isEmpty()) {
       // Timestamp type before/after Flink 1.14 has changed.
       if (flinkShims.getFlinkVersion().isAfterFlink114()) {
         java.time.LocalDateTime ldt = ((java.time.LocalDateTime) materializedTable
@@ -133,7 +132,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
       String result = buildResult();
       context.out.write(result);
       context.out.flush();
-      LOGGER.debug("Refresh with data: " + result);
+      LOGGER.debug("Refresh with data: {}", result);
     } catch (IOException e) {
       LOGGER.error("Fail to refresh data", e);
     }
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
index 902ff42889..5f81e89549 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
@@ -25,7 +25,6 @@ import org.apache.zeppelin.flink.FlinkShims;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.SingleRowInterpreterResult;
-import org.apache.zeppelin.tabledata.TableDataUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +34,7 @@ import java.util.List;
 
 public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
 
-  private static Logger LOGGER = LoggerFactory.getLogger(SingleRowStreamSqlJob.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SingleRowStreamSqlJob.class);
 
   private Row latestRow;
   private String template;
@@ -56,8 +55,9 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
     return "single";
   }
 
+  @Override
   protected void processInsert(Row row) {
-    LOGGER.debug("processInsert: " + row.toString());
+    LOGGER.debug("processInsert: {}", row);
     latestRow = row;
   }
 
@@ -95,8 +95,8 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
     singleRowResult.pushAngularObjects();
   }
 
-  private List rowToList(Row row) {
-    List list = new ArrayList<>();
+  private List<Object> rowToList(Row row) {
+    List<Object> list = new ArrayList<>();
     for (int i = 0; i < row.getArity(); i++) {
       list.add(row.getField(i));
     }
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
index 44105b9a74..9a7c8b436c 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
@@ -35,7 +35,7 @@ import java.util.List;
 
 public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
 
-  private static Logger LOGGER = LoggerFactory.getLogger(UpdateStreamSqlJob.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpdateStreamSqlJob.class);
 
   private List<Row> materializedTable = new ArrayList<>();
   private List<Row> lastSnapshot = new ArrayList<>();
@@ -54,19 +54,21 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
     return "retract";
   }
 
+  @Override
   protected void processInsert(Row row) {
     enableToRefresh = true;
     resultLock.notify();
-    LOGGER.debug("processInsert: " + row.toString());
+    LOGGER.debug("processInsert: {}", row);
     materializedTable.add(row);
   }
 
+  @Override
   protected void processDelete(Row row) {
     enableToRefresh = false;
-    LOGGER.debug("processDelete: " + row.toString());
+    LOGGER.debug("processDelete: {}", row);
     for (int i = 0; i < materializedTable.size(); i++) {
       if (flinkShims.rowEquals(materializedTable.get(i), row)) {
-        LOGGER.debug("real processDelete: " + row.toString());
+        LOGGER.debug("real processDelete: {}", row);
         materializedTable.remove(i);
         break;
       }
@@ -103,7 +105,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
       String result = buildResult();
       context.out.write(result);
       context.out.flush();
-      LOGGER.debug("Refresh with data: " + result);
+      LOGGER.debug("Refresh with data: {}", result);
       this.lastSnapshot.clear();
       for (Row row : materializedTable) {
         this.lastSnapshot.add(row);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 737bef8f4b..ee891a62d4 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -192,6 +192,7 @@ public class PySparkInterpreter extends PythonInterpreter {
     return "python";
   }
 
+  @Override
   public ZeppelinContext getZeppelinContext() {
     if (sparkInterpreter != null) {
       return sparkInterpreter.getZeppelinContext();
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 035924e603..62dfb1dd1a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -68,7 +68,7 @@ public class SparkInterpreter extends AbstractInterpreter {
   }
 
   private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
-  private static Class innerInterpreterClazz;
+  private static Class<?> innerInterpreterClazz;
   private AbstractSparkScalaInterpreter innerInterpreter;
   private Map<String, String> innerInterpreterClassMap = new HashMap<>();
   private SparkContext sc;
@@ -171,8 +171,8 @@ public class SparkInterpreter extends AbstractInterpreter {
             File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion);
             List<URL> urls = new ArrayList<>();
             for (File file : scalaJarFolder.listFiles()) {
-              LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
-                      + scalaJarFolder);
+              LOGGER.debug("Add file {} to classpath of spark scala interpreter: {}", file.getAbsolutePath(),
+                scalaJarFolder);
               urls.add(file.toURI().toURL());
             }
             scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
@@ -232,6 +232,7 @@ public class SparkInterpreter extends AbstractInterpreter {
     return innerInterpreter.getProgress(context);
   }
 
+  @Override
   public ZeppelinContext getZeppelinContext() {
     if (this.innerInterpreter == null) {
       throw new RuntimeException("innerInterpreterContext is null");
@@ -276,7 +277,7 @@ public class SparkInterpreter extends AbstractInterpreter {
     } else {
       scalaVersionString = scala.util.Properties.versionString();
     }
-    LOGGER.info("Using Scala: " + scalaVersionString);
+    LOGGER.info("Using Scala: {}", scalaVersionString);
 
     if (StringUtils.isEmpty(scalaVersionString)) {
       throw new InterpreterException("Scala Version is empty");
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java
index db975b2964..e83f8290eb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java
@@ -29,10 +29,10 @@ import java.util.List;
 public class SingleRowInterpreterResult {
 
   private String template;
-  private List values;
+  private List<Object> values;
   private InterpreterContext context;
 
-  public SingleRowInterpreterResult(List values, String template, InterpreterContext context) {
+  public SingleRowInterpreterResult(List<Object> values, String template, InterpreterContext context) {
     this.values = values;
     this.template = template;
     this.context = context;
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java
index b169bc0350..a83c685fa6 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java
@@ -17,19 +17,18 @@
 
 package org.apache.zeppelin.interpreter;
 
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
 
 public class SingleRowInterpreterResultTest {
 
   @Test
   public void testHtml() {
-    List<Serializable> list = Arrays.asList("2020-01-01", 10);
+    List<Object> list = Arrays.asList("2020-01-01", 10);
     String template = "Total count:{1} for {0}";
     InterpreterContext context = InterpreterContext.builder().build();
     SingleRowInterpreterResult singleRowInterpreterResult = new SingleRowInterpreterResult(list, template, context);
@@ -39,7 +38,7 @@ public class SingleRowInterpreterResultTest {
 
   @Test
   public void testAngular() {
-    List<Serializable> list = Arrays.asList("2020-01-01", 10);
+    List<Object> list = Arrays.asList("2020-01-01", 10);
     String template = "Total count:{1} for {0}";
     InterpreterContext context = InterpreterContext.builder().build();
     SingleRowInterpreterResult singleRowInterpreterResult = new SingleRowInterpreterResult(list, template, context);