You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2023/01/16 08:06:13 UTC
[zeppelin] branch master updated: [ZEPPELIN-5874] polish flink code base (#4553)
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new fa2a132dec [ZEPPELIN-5874] polish flink code base (#4553)
fa2a132dec is described below
commit fa2a132dec28bb1350b8d95ea005b210081b0e70
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Mon Jan 16 09:06:07 2023 +0100
[ZEPPELIN-5874] polish flink code base (#4553)
---
.../flink/ApplicationModeExecutionEnvironment.java | 1 -
.../apache/zeppelin/flink/FlinkInterpreter.java | 6 ++--
.../apache/zeppelin/flink/FlinkSqlInterpreter.java | 2 +-
.../apache/zeppelin/flink/IPyFlinkInterpreter.java | 4 +--
.../java/org/apache/zeppelin/flink/JobManager.java | 34 +++++++++++-----------
.../apache/zeppelin/flink/PyFlinkInterpreter.java | 2 +-
.../org/apache/zeppelin/flink/TableEnvFactory.java | 24 +++++++--------
.../apache/zeppelin/flink/internal/JarHelper.java | 12 +++-----
.../flink/internal/ScalaShellEnvironment.java | 3 --
.../zeppelin/flink/sql/AbstractStreamSqlJob.java | 16 +++++-----
.../zeppelin/flink/sql/AppendStreamSqlJob.java | 7 ++---
.../zeppelin/flink/sql/SingleRowStreamSqlJob.java | 10 +++----
.../zeppelin/flink/sql/UpdateStreamSqlJob.java | 12 ++++----
.../apache/zeppelin/spark/PySparkInterpreter.java | 1 +
.../apache/zeppelin/spark/SparkInterpreter.java | 9 +++---
.../interpreter/SingleRowInterpreterResult.java | 4 +--
.../SingleRowInterpreterResultTest.java | 9 +++---
17 files changed, 75 insertions(+), 81 deletions(-)
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
index 0a96734e86..52ba6fe033 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
-import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.zeppelin.flink.internal.FlinkILoop;
import java.io.File;
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index 63c69a004d..a10128f94c 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -53,7 +53,7 @@ public class FlinkInterpreter extends Interpreter {
private String extractScalaVersion() throws InterpreterException {
String scalaVersionString = scala.util.Properties.versionString();
- LOGGER.info("Using Scala: " + scalaVersionString);
+ LOGGER.info("Using Scala: {}", scalaVersionString);
if (scalaVersionString.contains("version 2.11")) {
return "2.11";
} else if (scalaVersionString.contains("version 2.12")) {
@@ -87,7 +87,7 @@ public class FlinkInterpreter extends Interpreter {
String scalaVersion = extractScalaVersion();
ClassLoader flinkScalaClassLoader = FlinkScalaInterpreter.class.getClassLoader();
String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
- Class clazz = Class.forName(innerIntpClassName);
+ Class<?> clazz = Class.forName(innerIntpClassName);
return (FlinkScalaInterpreter)
clazz.getConstructor(Properties.class, URLClassLoader.class)
@@ -104,7 +104,7 @@ public class FlinkInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
- LOGGER.debug("Interpret code: " + st);
+ LOGGER.debug("Interpret code: {}", st);
this.z.setInterpreterContext(context);
this.z.setGui(context.getGui());
this.z.setNoteGui(context.getNoteGui());
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
index 8c55765a86..1145da4dda 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
@@ -46,7 +46,7 @@ public abstract class FlinkSqlInterpreter extends AbstractInterpreter {
@Override
protected InterpreterResult internalInterpret(String st, InterpreterContext context) throws InterpreterException {
- LOGGER.debug("Interpret code: " + st);
+ LOGGER.debug("Interpret code: {}", st);
// set ClassLoader of current Thread to be the ClassLoader of Flink scala-shell,
// otherwise codegen will fail to find classes defined in scala-shell
ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 817d13fa3e..1bc61821f8 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -101,7 +101,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
InterpreterResult result =
super.internalInterpret("intp.resetClassLoaderInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
- LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+ LOGGER.warn("Fail to resetClassLoaderInPythonThread: {}", result);
}
}
}
@@ -112,7 +112,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
flinkInterpreter.cancel(context);
super.cancel(context);
}
-
+
/**
* Called by python process.
*/
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 33a75dac40..61fdf4db80 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class JobManager {
- private static Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
public static final String LATEST_CHECKPOINT_PATH = "latest_checkpoint_path";
public static final String SAVEPOINT_PATH = "savepoint_path";
public static final String RESUME_FROM_SAVEPOINT = "resumeFromSavepoint";
@@ -62,7 +62,7 @@ public class JobManager {
LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}",
flinkWebUrl, displayedFlinkWebUrl);
}
-
+
public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
@@ -83,18 +83,18 @@ public class JobManager {
}
public void removeJob(String paragraphId) {
- LOGGER.info("Remove job in paragraph: " + paragraphId);
+ LOGGER.info("Remove job in paragraph: {}", paragraphId);
JobClient jobClient = this.jobs.remove(paragraphId);
if (jobClient == null) {
- LOGGER.warn("Unable to remove job, because no job is associated with paragraph: "
- + paragraphId);
+ LOGGER.warn("Unable to remove job, because no job is associated with paragraph: {}",
+ paragraphId);
return;
}
FlinkJobProgressPoller jobProgressPoller =
this.jobProgressPollerMap.remove(jobClient.getJobID());
if (jobProgressPoller == null) {
- LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: "
- + paragraphId);
+ LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: {}",
+ paragraphId);
return;
}
@@ -114,21 +114,21 @@ public class JobManager {
infos.put("paraId", context.getParagraphId());
context.getIntpEventClient().onParaInfosReceived(infos);
} else {
- LOGGER.warn("No job is associated with paragraph: " + context.getParagraphId());
+ LOGGER.warn("No job is associated with paragraph: {}", context.getParagraphId());
}
}
public int getJobProgress(String paragraphId) {
JobClient jobClient = this.jobs.get(paragraphId);
if (jobClient == null) {
- LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
- ", because no job is associated with this paragraph");
+ LOGGER.warn("Unable to get job progress for paragraph: {}"
+ + ", because no job is associated with this paragraph", paragraphId);
return 0;
}
FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.get(jobClient.getJobID());
if (jobProgressPoller == null) {
- LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId +
- ", because no job progress is associated with this jobId: " + jobClient.getJobID());
+ LOGGER.warn("Unable to get job progress for paragraph: {}"
+ + ", because no job progress is associated with this jobId: {}", paragraphId, jobClient.getJobID());
return 0;
}
return jobProgressPoller.getProgress();
@@ -174,8 +174,8 @@ public class JobManager {
throw new InterpreterException(errorMessage, e);
} finally {
if (cancelled) {
- LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: "
- + context.getParagraphId());
+ LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: {}",
+ context.getParagraphId());
FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobClient.getJobID());
if (jobProgressPoller != null) {
jobProgressPoller.cancel();
@@ -231,11 +231,11 @@ public class JobManager {
totalTasks += vertex.getInt("parallelism");
finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED");
}
- LOGGER.debug("Total tasks:" + totalTasks);
- LOGGER.debug("Finished tasks:" + finishedTasks);
+ LOGGER.debug("Total tasks:{}", totalTasks);
+ LOGGER.debug("Finished tasks:{}", finishedTasks);
if (finishedTasks != 0) {
this.progress = finishedTasks * 100 / totalTasks;
- LOGGER.debug("Progress: " + this.progress);
+ LOGGER.debug("Progress: {}", this.progress);
}
String jobState = rootNode.getObject().getString("state");
if (jobState.equalsIgnoreCase("finished")) {
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 3a33cd7c4b..df203b71b8 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -123,7 +123,7 @@ public class PyFlinkInterpreter extends PythonInterpreter {
if (useIPython() || (!useIPython() && getPythonProcessLauncher().isRunning())) {
InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context);
if (result.code() != InterpreterResult.Code.SUCCESS) {
- LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString());
+ LOGGER.warn("Fail to resetClassLoaderInPythonThread: {}", result);
}
}
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
index 711993578d..5ec2de96eb 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java
@@ -44,7 +44,7 @@ import java.lang.reflect.Constructor;
*/
public class TableEnvFactory {
- private static Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class);
private FlinkVersion flinkVersion;
private FlinkShims flinkShims;
@@ -99,10 +99,10 @@ public class TableEnvFactory {
public TableEnvironment createScalaFlinkBatchTableEnvironment() {
try {
- Class clazz = Class
+ Class<?> clazz = Class
.forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl");
- Constructor constructor = clazz
+ Constructor<?> constructor = clazz
.getConstructor(
org.apache.flink.api.scala.ExecutionEnvironment.class,
TableConfig.class,
@@ -121,7 +121,7 @@ public class TableEnvFactory {
Class<?> clazz = Class
.forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl");
- Constructor con = clazz.getConstructor(
+ Constructor<?> con = clazz.getConstructor(
ExecutionEnvironment.class,
TableConfig.class,
CatalogManager.class,
@@ -146,10 +146,10 @@ public class TableEnvFactory {
Planner planner = (Planner) pair.left;
Executor executor = (Executor) pair.right;
- Class clazz = Class
+ Class<?> clazz = Class
.forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl");
try {
- Constructor constructor = clazz
+ Constructor<?> constructor = clazz
.getConstructor(
CatalogManager.class,
ModuleManager.class,
@@ -169,7 +169,7 @@ public class TableEnvFactory {
settings.isStreamingMode());
} catch (NoSuchMethodException e) {
// Flink 1.11.1 change the constructor signature, FLINK-18419
- Constructor constructor = clazz
+ Constructor<?> constructor = clazz
.getConstructor(
CatalogManager.class,
ModuleManager.class,
@@ -203,11 +203,11 @@ public class TableEnvFactory {
Planner planner = (Planner) pair.left;
Executor executor = (Executor) pair.right;
- Class clazz = Class
+ Class<?> clazz = Class
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
try {
- Constructor constructor = clazz
+ Constructor<?> constructor = clazz
.getConstructor(
CatalogManager.class,
ModuleManager.class,
@@ -262,10 +262,10 @@ public class TableEnvFactory {
Planner planner = (Planner) pair.left;
Executor executor = (Executor) pair.right;
- Class clazz = Class
+ Class<?> clazz = Class
.forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl");
try {
- Constructor constructor = clazz.getConstructor(
+ Constructor<?> constructor = clazz.getConstructor(
CatalogManager.class,
ModuleManager.class,
FunctionCatalog.class,
@@ -285,7 +285,7 @@ public class TableEnvFactory {
settings.isStreamingMode());
} catch (NoSuchMethodException e) {
// Flink 1.11.1 change the constructor signature, FLINK-18419
- Constructor constructor = clazz.getConstructor(
+ Constructor<?> constructor = clazz.getConstructor(
CatalogManager.class,
ModuleManager.class,
FunctionCatalog.class,
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
index 07b5033238..648fe51e0f 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java
@@ -72,16 +72,13 @@ public class JarHelper {
}
mDestJarName = destJar.getCanonicalPath();
- FileOutputStream fout = new FileOutputStream(destJar);
- JarOutputStream jout = new JarOutputStream(fout);
- // jout.setLevel(0);
- try {
+ try (
+ FileOutputStream fout = new FileOutputStream(destJar);
+ JarOutputStream jout = new JarOutputStream(fout)) {
+ // jout.setLevel(0);
jarDir(dirOrFile2Jar, jout, null);
} catch (IOException ioe) {
throw ioe;
- } finally {
- jout.close();
- fout.close();
}
}
@@ -89,7 +86,6 @@ public class JarHelper {
* Unjars a given jar file into a given directory.
*/
public void unjarDir(File jarFile, File destDir) throws IOException {
- BufferedOutputStream dest = null;
FileInputStream fis = new FileInputStream(jarFile);
unjar(fis, destDir);
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
index ebd9f1d2fb..c8288b96f4 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java
@@ -19,11 +19,9 @@
package org.apache.zeppelin.flink.internal;
-import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.util.JarUtils;
@@ -34,7 +32,6 @@ import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
/**
* This class is copied from flink project, the reason is that flink scala shell only supports
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 1ee26143cc..e1e1642cbd 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -117,7 +117,7 @@ public abstract class AbstractStreamSqlJob {
this.schema = removeTimeAttributes(flinkShims, table.getSchema());
checkTableSchema(schema);
- LOGGER.info("ResultTable Schema: " + this.schema);
+ LOGGER.info("ResultTable Schema: {}", this.schema);
final RowTypeInfo outputType = new RowTypeInfo(schema.getFieldTypes(),
schema.getFieldNames());
@@ -132,8 +132,8 @@ public abstract class AbstractStreamSqlJob {
serializer);
// create table sink
// pass binding address and port such that sink knows where to send to
- LOGGER.debug("Collecting data at address: " + iterator.getBindAddress() +
- ":" + iterator.getPort());
+ LOGGER.debug("Collecting data at address: {}:{}",
+ iterator.getBindAddress(), iterator.getPort());
RetractStreamTableSink collectTableSink =
(RetractStreamTableSink) flinkShims.getCollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
// new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer);
@@ -149,16 +149,16 @@ public abstract class AbstractStreamSqlJob {
ResultRetrievalThread retrievalThread = new ResultRetrievalThread(refreshScheduler);
retrievalThread.start();
- LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism);
+ LOGGER.info("Run job: {}, parallelism: {}", tableName, parallelism);
String jobName = context.getStringLocalProperty("jobName", tableName);
table.executeInsert(tableName).await();
- LOGGER.info("Flink Job is finished, jobName: " + jobName);
+ LOGGER.info("Flink Job is finished, jobName: {}", jobName);
// wait for retrieve thread consume all data
LOGGER.info("Waiting for retrieve thread to be done");
retrievalThread.join();
refresh(context);
String finalResult = buildResult();
- LOGGER.info("Final Result: " + finalResult);
+ LOGGER.info("Final Result: {}", finalResult);
return finalResult;
} catch (Exception e) {
LOGGER.error("Fail to run stream sql job", e);
@@ -229,7 +229,7 @@ public abstract class AbstractStreamSqlJob {
isRunning = false;
LOGGER.info("ResultRetrieval Thread is done, isRunning={}, hasNext={}",
isRunning, iterator.hasNext());
- LOGGER.info("Final Result: " + buildResult());
+ LOGGER.info("Final Result: {}", buildResult());
refreshExecutorService.shutdownNow();
}
@@ -255,7 +255,7 @@ public abstract class AbstractStreamSqlJob {
if (!enableToRefresh) {
resultLock.wait();
}
- LOGGER.debug("Refresh result of paragraph: " + context.getParagraphId());
+ LOGGER.debug("Refresh result of paragraph: {}", context.getParagraphId());
refresh(context);
}
} catch (Exception e) {
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
index c6791ea6d3..705f9979b8 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Timestamp;
-import java.time.temporal.TemporalField;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -70,7 +69,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
@Override
protected void processInsert(Row row) {
- LOGGER.debug("processInsert: " + row.toString());
+ LOGGER.debug("processInsert: {}", row.toString());
materializedTable.add(row);
}
@@ -99,7 +98,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
return f1.compareTo(f2);
});
- if (materializedTable.size() != 0) {
+ if (!materializedTable.isEmpty()) {
// Timestamp type before/after Flink 1.14 has changed.
if (flinkShims.getFlinkVersion().isAfterFlink114()) {
java.time.LocalDateTime ldt = ((java.time.LocalDateTime) materializedTable
@@ -133,7 +132,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
String result = buildResult();
context.out.write(result);
context.out.flush();
- LOGGER.debug("Refresh with data: " + result);
+ LOGGER.debug("Refresh with data: {}", result);
} catch (IOException e) {
LOGGER.error("Fail to refresh data", e);
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
index 902ff42889..5f81e89549 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
@@ -25,7 +25,6 @@ import org.apache.zeppelin.flink.FlinkShims;
import org.apache.zeppelin.flink.JobManager;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.SingleRowInterpreterResult;
-import org.apache.zeppelin.tabledata.TableDataUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +34,7 @@ import java.util.List;
public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
- private static Logger LOGGER = LoggerFactory.getLogger(SingleRowStreamSqlJob.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SingleRowStreamSqlJob.class);
private Row latestRow;
private String template;
@@ -56,8 +55,9 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
return "single";
}
+ @Override
protected void processInsert(Row row) {
- LOGGER.debug("processInsert: " + row.toString());
+ LOGGER.debug("processInsert: {}", row);
latestRow = row;
}
@@ -95,8 +95,8 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob {
singleRowResult.pushAngularObjects();
}
- private List rowToList(Row row) {
- List list = new ArrayList<>();
+ private List<Object> rowToList(Row row) {
+ List<Object> list = new ArrayList<>();
for (int i = 0; i < row.getArity(); i++) {
list.add(row.getField(i));
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
index 44105b9a74..9a7c8b436c 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
@@ -35,7 +35,7 @@ import java.util.List;
public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
- private static Logger LOGGER = LoggerFactory.getLogger(UpdateStreamSqlJob.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpdateStreamSqlJob.class);
private List<Row> materializedTable = new ArrayList<>();
private List<Row> lastSnapshot = new ArrayList<>();
@@ -54,19 +54,21 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
return "retract";
}
+ @Override
protected void processInsert(Row row) {
enableToRefresh = true;
resultLock.notify();
- LOGGER.debug("processInsert: " + row.toString());
+ LOGGER.debug("processInsert: {}", row);
materializedTable.add(row);
}
+ @Override
protected void processDelete(Row row) {
enableToRefresh = false;
- LOGGER.debug("processDelete: " + row.toString());
+ LOGGER.debug("processDelete: {}", row);
for (int i = 0; i < materializedTable.size(); i++) {
if (flinkShims.rowEquals(materializedTable.get(i), row)) {
- LOGGER.debug("real processDelete: " + row.toString());
+ LOGGER.debug("real processDelete: {}", row);
materializedTable.remove(i);
break;
}
@@ -103,7 +105,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
String result = buildResult();
context.out.write(result);
context.out.flush();
- LOGGER.debug("Refresh with data: " + result);
+ LOGGER.debug("Refresh with data: {}", result);
this.lastSnapshot.clear();
for (Row row : materializedTable) {
this.lastSnapshot.add(row);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 737bef8f4b..ee891a62d4 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -192,6 +192,7 @@ public class PySparkInterpreter extends PythonInterpreter {
return "python";
}
+ @Override
public ZeppelinContext getZeppelinContext() {
if (sparkInterpreter != null) {
return sparkInterpreter.getZeppelinContext();
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 035924e603..62dfb1dd1a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -68,7 +68,7 @@ public class SparkInterpreter extends AbstractInterpreter {
}
private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
- private static Class innerInterpreterClazz;
+ private static Class<?> innerInterpreterClazz;
private AbstractSparkScalaInterpreter innerInterpreter;
private Map<String, String> innerInterpreterClassMap = new HashMap<>();
private SparkContext sc;
@@ -171,8 +171,8 @@ public class SparkInterpreter extends AbstractInterpreter {
File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion);
List<URL> urls = new ArrayList<>();
for (File file : scalaJarFolder.listFiles()) {
- LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
- + scalaJarFolder);
+ LOGGER.debug("Add file {} to classpath of spark scala interpreter: {}", file.getAbsolutePath(),
+ scalaJarFolder);
urls.add(file.toURI().toURL());
}
scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
@@ -232,6 +232,7 @@ public class SparkInterpreter extends AbstractInterpreter {
return innerInterpreter.getProgress(context);
}
+ @Override
public ZeppelinContext getZeppelinContext() {
if (this.innerInterpreter == null) {
throw new RuntimeException("innerInterpreterContext is null");
@@ -276,7 +277,7 @@ public class SparkInterpreter extends AbstractInterpreter {
} else {
scalaVersionString = scala.util.Properties.versionString();
}
- LOGGER.info("Using Scala: " + scalaVersionString);
+ LOGGER.info("Using Scala: {}", scalaVersionString);
if (StringUtils.isEmpty(scalaVersionString)) {
throw new InterpreterException("Scala Version is empty");
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java
index db975b2964..e83f8290eb 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java
@@ -29,10 +29,10 @@ import java.util.List;
public class SingleRowInterpreterResult {
private String template;
- private List values;
+ private List<Object> values;
private InterpreterContext context;
- public SingleRowInterpreterResult(List values, String template, InterpreterContext context) {
+ public SingleRowInterpreterResult(List<Object> values, String template, InterpreterContext context) {
this.values = values;
this.template = template;
this.context = context;
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java
index b169bc0350..a83c685fa6 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java
@@ -17,19 +17,18 @@
package org.apache.zeppelin.interpreter;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
-import static org.junit.Assert.assertEquals;
+import org.junit.Test;
public class SingleRowInterpreterResultTest {
@Test
public void testHtml() {
- List<Serializable> list = Arrays.asList("2020-01-01", 10);
+ List<Object> list = Arrays.asList("2020-01-01", 10);
String template = "Total count:{1} for {0}";
InterpreterContext context = InterpreterContext.builder().build();
SingleRowInterpreterResult singleRowInterpreterResult = new SingleRowInterpreterResult(list, template, context);
@@ -39,7 +38,7 @@ public class SingleRowInterpreterResultTest {
@Test
public void testAngular() {
- List<Serializable> list = Arrays.asList("2020-01-01", 10);
+ List<Object> list = Arrays.asList("2020-01-01", 10);
String template = "Total count:{1} for {0}";
InterpreterContext context = InterpreterContext.builder().build();
SingleRowInterpreterResult singleRowInterpreterResult = new SingleRowInterpreterResult(list, template, context);