You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/18 03:08:16 UTC
[zeppelin] branch master updated: [ZEPPELIN-5193] Describe table
using full table name in FlinkSQL can not work
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new c45f415 [ZEPPELIN-5193] Describe table using full table name in FlinkSQL can not work
c45f415 is described below
commit c45f4152d022968d617616025ca0386130cbb4e5
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jun 14 23:38:31 2021 +0800
[ZEPPELIN-5193] Describe table using full table name in FlinkSQL can not work
### What is this PR for?
Simple PR to make describing table via full table name in flink sql work. Just split full table name via `dot` separator. So that flink api can recognize it correctly. See unit test for more details.
### What type of PR is it?
[ Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5193
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4138 from zjffdu/ZEPPELIN-5193 and squashes the following commits:
5b5c2817f7 [Jeff Zhang] [ZEPPELIN-5193]. Describe table using full table name in FlinkSQL can not work
---
.../main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 2 +-
.../test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java | 9 +++++++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index b4ce193..baa6ea7 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -469,7 +469,7 @@ public abstract class FlinkSqlInterrpeter extends AbstractInterpreter {
}
private void callDescribe(String name, InterpreterContext context) throws IOException {
- TableSchema schema = tbenv.scan(name).getSchema();
+ TableSchema schema = tbenv.scan(name.split("\\.")).getSchema();
StringBuilder builder = new StringBuilder();
builder.append("Column\tType\n");
for (int i = 0; i < schema.getFieldCount(); ++i) {
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 1d49686..debe9d1 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -195,6 +195,15 @@ public abstract class SqlInterpreterTest {
assertEquals("table\nsource\n", resultMessages.get(0).getData());
context = getInterpreterContext();
+ result = sqlInterpreter.interpret("describe db1.source", context);
+ assertEquals(Code.SUCCESS, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(Type.TABLE, resultMessages.get(0).getType());
+ assertEquals("Column\tType\n" +
+ "msg\tINT\n"
+ , resultMessages.get(0).getData());
+
+ context = getInterpreterContext();
result = sqlInterpreter.interpret("use default", context);
assertEquals(Code.SUCCESS, result.code());