You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/23 08:24:07 UTC

[zeppelin] branch master updated: [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new c98081c  [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter
c98081c is described below

commit c98081c7cf83e5d606ed09be2cdcbf7e845b6ad8
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Jun 18 17:27:32 2021 +0800

    [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter
    
    ### What is this PR for?
    
    Add timezone support for flink interpreter, It is only available for flink 1.13, previous version are not supported.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5392
    
    ### How should this be tested?
    * Manually tested.
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/121799463-d8ba1d00-cc5e-11eb-8a45-776b94496bd1.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4137 from zjffdu/ZEPPELIN-5392 and squashes the following commits:
    
    77695c7acb [Jeff Zhang] set table in run method
    402c01c17d [Jeff Zhang] address comments
    b0b495463f [Jeff Zhang] [ZEPPELIN-5392] Timezone is not applied in the timestamp field in flink interpreter
---
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java    | 21 ++++++++++++++++++++-
 .../zeppelin/flink/sql/AppendStreamSqlJob.java      | 11 +----------
 .../zeppelin/flink/sql/UpdateStreamSqlJob.java      | 11 +----------
 .../java/org/apache/zeppelin/flink/FlinkShims.java  |  2 ++
 .../org/apache/zeppelin/flink/Flink110Shims.java    | 20 +++++++++++++++++++-
 .../org/apache/zeppelin/flink/Flink111Shims.java    |  6 ++++++
 .../org/apache/zeppelin/flink/Flink112Shims.java    |  6 ++++++
 .../org/apache/zeppelin/flink/Flink113Shims.java    | 16 ++++++++++++++++
 8 files changed, 71 insertions(+), 22 deletions(-)

diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 91cdb90..1fb7832 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -36,14 +36,18 @@ import org.apache.zeppelin.flink.FlinkShims;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.apache.zeppelin.tabledata.TableDataUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -57,6 +61,7 @@ public abstract class AbstractStreamSqlJob {
   private static AtomicInteger SQL_INDEX = new AtomicInteger(0);
   protected StreamExecutionEnvironment senv;
   protected TableEnvironment stenv;
+  private Table table;
   protected JobManager jobManager;
   protected InterpreterContext context;
   protected TableSchema schema;
@@ -99,7 +104,7 @@ public abstract class AbstractStreamSqlJob {
   protected abstract String getType();
 
   public String run(String st) throws IOException {
-    Table table = stenv.sqlQuery(st);
+    this.table = stenv.sqlQuery(st);
     String tableName = "UnnamedTable_" +
             "_" + SQL_INDEX.getAndIncrement();
     return run(table, tableName);
@@ -107,6 +112,7 @@ public abstract class AbstractStreamSqlJob {
 
   public String run(Table table, String tableName) throws IOException {
     try {
+      this.table = table;
       int parallelism = Integer.parseInt(context.getLocalProperties()
               .getOrDefault("parallelism", defaultParallelism + ""));
       this.schema = removeTimeAttributes(table.getSchema());
@@ -197,6 +203,19 @@ public abstract class AbstractStreamSqlJob {
 
   protected abstract String buildResult();
 
+  protected String tableToString(List<Row> rows) {
+    StringBuilder builder = new StringBuilder();
+    for (Row row : rows) {
+      String[] fields = flinkShims.rowToString(row, table, stenv.getConfig());
+      String rowString = Arrays.stream(fields)
+              .map(TableDataUtils::normalizeColumn)
+              .collect(Collectors.joining("\t"));
+      builder.append(rowString);
+      builder.append("\n");
+    }
+    return builder.toString();
+  }
+
   private class ResultRetrievalThread extends Thread {
 
     private ScheduledExecutorService refreshExecutorService;
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
index f1eb997..7f636f3 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java
@@ -107,16 +107,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob {
                       maxTimestamp - tsWindowThreshold)
               .collect(Collectors.toList());
 
-      for (Row row : materializedTable) {
-        for (int i = 0; i < row.getArity(); ++i) {
-          Object field = row.getField(i);
-          builder.append(TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(field)));
-          if (i != (row.getArity() - 1)) {
-            builder.append("\t");
-          }
-        }
-        builder.append("\n");
-      }
+      builder.append(tableToString(materializedTable));
     }
     builder.append("\n%text ");
     return builder.toString();
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
index dc1ecc7..44105b9 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java
@@ -91,16 +91,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob {
       String f2 = TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(r2.getField(0)));
       return f1.compareTo(f2);
     });
-    for (Row row : materializedTable) {
-      for (int i = 0; i < row.getArity(); ++i) {
-        Object field = row.getField(i);
-        builder.append(TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(field)));
-        if (i != (row.getArity() - 1)) {
-          builder.append("\t");
-        }
-      }
-      builder.append("\n");
-    }
+    builder.append(tableToString(materializedTable));
     builder.append("\n%text\n");
     return builder.toString();
   }
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index acdb4a9..c41488f 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -157,4 +157,6 @@ public abstract class FlinkShims {
   public void setOldPlanner(Object tableConfig) {
     // only needed after flink 1.13
   }
+
+  public abstract String[] rowToString(Object row, Object table, Object tableConfig);
 }
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index 9a03b95..3faa52c 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -44,8 +44,8 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkException;
 import org.apache.zeppelin.flink.shims110.CollectStreamTableSink;
@@ -319,4 +319,22 @@ public class Flink110Shims extends FlinkShims {
     }
     return configOptions;
   }
+
+  @Override
+  public String[] rowToString(Object row, Object table, Object tableConfig) {
+    return rowToString((Row) row);
+  }
+
+  private String[] rowToString(Row row) {
+    final String[] fields = new String[row.getArity()];
+    for (int i = 0; i < row.getArity(); i++) {
+      final Object field = row.getField(i);
+      if (field == null) {
+        fields[i] = "(NULL)";
+      } else {
+        fields[i] = EncodingUtils.objectToString(field);
+      }
+    }
+    return fields;
+  }
 }
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index f290fad..c8db525 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -79,6 +79,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
@@ -467,4 +468,9 @@ public class Flink111Shims extends FlinkShims {
     }
     return configOptions;
   }
+
+  @Override
+  public String[] rowToString(Object row, Object table, Object tableConfig) {
+    return PrintUtils.rowToString((Row) row);
+  }
 }
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index b3f6ccc..648826a 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -80,6 +80,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
@@ -480,4 +481,9 @@ public class Flink112Shims extends FlinkShims {
     }
     return configOptions;
   }
+
+  @Override
+  public String[] rowToString(Object row, Object table, Object tableConfig) {
+    return PrintUtils.rowToString((Row) row);
+  }
 }
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 8f6f813..054f07f 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -50,6 +51,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
@@ -82,6 +84,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.utils.PrintUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
@@ -101,6 +104,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -492,4 +496,16 @@ public class Flink113Shims extends FlinkShims {
     ((TableConfig) tableConfig).getConfiguration()
             .set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD);
   }
+
+  @Override
+  public String[] rowToString(Object row, Object table, Object tableConfig) {
+    final String zone = ((TableConfig) tableConfig).getConfiguration()
+            .get(TableConfigOptions.LOCAL_TIME_ZONE);
+    ZoneId zoneId = TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+            ? ZoneId.systemDefault()
+            : ZoneId.of(zone);
+
+    ResolvedSchema resolvedSchema = ((Table) table).getResolvedSchema();
+    return PrintUtils.rowToString((Row) row, resolvedSchema, zoneId);
+  }
 }