You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/23 08:24:07 UTC
[zeppelin] branch master updated: [ZEPPELIN-5392] Timezone is not
applied in the timestamp field in flink interpreter
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new c98081c [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter
c98081c is described below
commit c98081c7cf83e5d606ed09be2cdcbf7e845b6ad8
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Jun 18 17:27:32 2021 +0800
[ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter
### What is this PR for?
Add timezone support for flink interpreter, It is only available for flink 1.13, previous version are not supported.
### What type of PR is it?
[ Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5392
### How should this be tested?
* Manually tested.
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/121799463-d8ba1d00-cc5e-11eb-8a45-776b94496bd1.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4137 from zjffdu/ZEPPELIN-5392 and squashes the following commits:
77695c7acb [Jeff Zhang] set table in run method
402c01c17d [Jeff Zhang] address comments
b0b495463f [Jeff Zhang] [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter
---
.../zeppelin/flink/sql/AbstractStreamSqlJob.java | 21 ++++++++++++++++++++-
.../zeppelin/flink/sql/AppendStreamSqlJob.java | 11 +----------
.../zeppelin/flink/sql/UpdateStreamSqlJob.java | 11 +----------
.../java/org/apache/zeppelin/flink/FlinkShims.java | 2 ++
.../org/apache/zeppelin/flink/Flink110Shims.java | 20 +++++++++++++++++++-
.../org/apache/zeppelin/flink/Flink111Shims.java | 6 ++++++
.../org/apache/zeppelin/flink/Flink112Shims.java | 6 ++++++
.../org/apache/zeppelin/flink/Flink113Shims.java | 16 ++++++++++++++++
8 files changed, 71 insertions(+), 22 deletions(-)
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 91cdb90..1fb7832 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -36,14 +36,18 @@ import org.apache.zeppelin.flink.FlinkShims;
import org.apache.zeppelin.flink.JobManager;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.apache.zeppelin.tabledata.TableDataUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -57,6 +61,7 @@ public abstract class AbstractStreamSqlJob {
private static AtomicInteger SQL_INDEX = new AtomicInteger(0);
protected StreamExecutionEnvironment senv;
protected TableEnvironment stenv;
+ private Table table;
protected JobManager jobManager;
protected InterpreterContext context;
protected TableSchema schema;
@@ -99,7 +104,7 @@ public abstract class AbstractStreamSqlJob {
protected abstract String getType();
public String run(String st) throws IOException {
- Table table = stenv.sqlQuery(st);
+ this.table = stenv.sqlQuery(st);
String tableName = "UnnamedTable_" +
"_" + SQL_INDEX.getAndIncrement();
return run(table, tableName);
@@ -107,6 +112,7 @@ public abstract class AbstractStreamSqlJob {
public String run(Table table, String tableName) throws IOException {
try {
+ this.table = table;
int parallelism = Integer.parseInt(context.getLocalProperties()
.getOrDefault("parallelism", defaultParallelism + ""));
this.schema = removeTimeAttributes(table.getSchema());
@@ -197,6 +203,19 @@ public abstract class AbstractStreamSqlJob {
protected abstract String buildResult();
+ protected String tableToString(List<Row> rows) {
+ StringBuilder builder = new StringBuilder();
+ for (Row row : rows) {
+ String[] fields = flinkShims.rowToString(row, table, stenv.getConfig());
+ String rowString = Arrays.stream(fields)
+ .map(TableDataUtils::normalizeColumn)
+ .collect(Collectors.joining("\t"));
+ builder.append(rowString);
+ builder.append("\n");
+ }
+ return builder.toString();
+ }
+
private class ResultRetrievalThread extends Thread {
private ScheduledExecutorService refreshExecutorService;
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
index f1eb997..7f636f3 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
@@ -107,16 +107,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
maxTimestamp - tsWindowThreshold)
.collect(Collectors.toList());
- for (Row row : materializedTable) {
- for (int i = 0; i < row.getArity(); ++i) {
- Object field = row.getField(i);
- builder.append(TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(field)));
- if (i != (row.getArity() - 1)) {
- builder.append("\t");
- }
- }
- builder.append("\n");
- }
+ builder.append(tableToString(materializedTable));
}
builder.append("\n%text ");
return builder.toString();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
index dc1ecc7..44105b9 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
@@ -91,16 +91,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
String f2 = TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(r2.getField(0)));
return f1.compareTo(f2);
});
- for (Row row : materializedTable) {
- for (int i = 0; i < row.getArity(); ++i) {
- Object field = row.getField(i);
- builder.append(TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(field)));
- if (i != (row.getArity() - 1)) {
- builder.append("\t");
- }
- }
- builder.append("\n");
- }
+ builder.append(tableToString(materializedTable));
builder.append("\n%text\n");
return builder.toString();
}
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index acdb4a9..c41488f 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -157,4 +157,6 @@ public abstract class FlinkShims {
public void setOldPlanner(Object tableConfig) {
// only needed after flink 1.13
}
+
+ public abstract String[] rowToString(Object row, Object table, Object tableConfig);
}
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 9a03b95..3faa52c 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -44,8 +44,8 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkException;
import org.apache.zeppelin.flink.shims110.CollectStreamTableSink;
@@ -319,4 +319,22 @@ public class Flink110Shims extends FlinkShims {
}
return configOptions;
}
+
+ @Override
+ public String[] rowToString(Object row, Object table, Object tableConfig) {
+ return rowToString((Row) row);
+ }
+
+ private String[] rowToString(Row row) {
+ final String[] fields = new String[row.getArity()];
+ for (int i = 0; i < row.getArity(); i++) {
+ final Object field = row.getField(i);
+ if (field == null) {
+ fields[i] = "(NULL)";
+ } else {
+ fields[i] = EncodingUtils.objectToString(field);
+ }
+ }
+ return fields;
+ }
}
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index f290fad..c8db525 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -79,6 +79,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
@@ -467,4 +468,9 @@ public class Flink111Shims extends FlinkShims {
}
return configOptions;
}
+
+ @Override
+ public String[] rowToString(Object row, Object table, Object tableConfig) {
+ return PrintUtils.rowToString((Row) row);
+ }
}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index b3f6ccc..648826a 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -80,6 +80,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
@@ -480,4 +481,9 @@ public class Flink112Shims extends FlinkShims {
}
return configOptions;
}
+
+ @Override
+ public String[] rowToString(Object row, Object table, Object tableConfig) {
+ return PrintUtils.rowToString((Row) row);
+ }
}
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 8f6f813..054f07f 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -50,6 +51,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
@@ -82,6 +84,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
@@ -101,6 +104,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -492,4 +496,16 @@ public class Flink113Shims extends FlinkShims {
((TableConfig) tableConfig).getConfiguration()
.set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD);
}
+
+ @Override
+ public String[] rowToString(Object row, Object table, Object tableConfig) {
+ final String zone = ((TableConfig) tableConfig).getConfiguration()
+ .get(TableConfigOptions.LOCAL_TIME_ZONE);
+ ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+ ? ZoneId.systemDefault()
+ : ZoneId.of(zone);
+
+ ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
+ return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
+ }
}