You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/02/05 05:33:07 UTC
[zeppelin] branch master updated: [ZEPPELIN-5217]. Move %flink.cmd
as a separated interpreter module
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 4cebdbf [ZEPPELIN-5217]. Move %flink.cmd as a separated interpreter module
4cebdbf is described below
commit 4cebdbf5aa2d04d37f6c729d22bd63f16f9a4694
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Jan 22 16:18:55 2021 +0800
[ZEPPELIN-5217]. Move %flink.cmd as a separated interpreter module
### What is this PR for?
Similar as ZEPPELIN-5213, just move %flink.cmd as separated interpreter module.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5217
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4038 from zjffdu/ZEPPELIN-5217 and squashes the following commits:
5e87345de [Jeff Zhang] [ZEPPELIN-5217]. Move %flink.cmd as a separated interpreter module
---
.github/workflows/core.yml | 10 +--
flink-cmd/pom.xml | 82 ++++++++++++++++++++++
.../zeppelin/flink/cmd}/FlinkCmdInterpreter.java | 33 +--------
.../org/apache/zeppelin/flink/cmd/YarnUtils.java | 58 +++++++++++++++
.../src/main/resources/interpreter-setting.json | 21 ++++++
.../flink/cmd/FlinkCmdInterpreterTest.java | 28 ++++++++
flink/interpreter/pom.xml | 16 -----
.../src/main/resources/interpreter-setting.json | 12 ----
pom.xml | 1 +
.../zeppelin/integration/FlinkIntegrationTest.java | 12 +++-
10 files changed, 207 insertions(+), 66 deletions(-)
diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index c0e7c21..5d5871e 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -66,7 +66,7 @@ jobs:
test-interpreter-modules:
runs-on: ubuntu-18.04
env:
- INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink,ignite,kylin,lens,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb'
+ INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink,flink-cmd,ignite,kylin,lens,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb'
steps:
- name: Checkout
uses: actions/checkout@v2
@@ -125,10 +125,10 @@ jobs:
R -e "IRkernel::installspec()"
- name: install environment
run: |
- mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/spark-dependencies,markdown,flink/interpreter,jdbc,shell -am
+ mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/spark-dependencies,markdown,flink-cmd,flink/interpreter,jdbc,shell -am
mvn package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B
- name: run tests
- run: mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
+ run: mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -DfailIfNoTests=false -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
test-flink-and-flink-integration-test:
runs-on: ubuntu-18.04
strategy:
@@ -160,10 +160,10 @@ jobs:
auto-activate-base: false
- name: install environment
run: |
- mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
+ mvn install -DskipTests -DskipRat -am -pl flink/interpreter,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
mvn clean package -pl zeppelin-plugins -amd -DskipTests -B
- name: run tests
- run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
+ run: mvn test -DskipRat -pl flink/interpreter,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
run-spark-intergration-test:
runs-on: ubuntu-18.04
steps:
diff --git a/flink-cmd/pom.xml b/flink-cmd/pom.xml
new file mode 100644
index 0000000..fef06da
--- /dev/null
+++ b/flink-cmd/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zeppelin-interpreter-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.10.0-SNAPSHOT</version>
+ <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-flink-cmd</artifactId>
+ <packaging>jar</packaging>
+ <version>0.10.0-SNAPSHOT</version>
+ <name>Zeppelin: Flink-Cmd interpreter</name>
+
+ <properties>
+ <interpreter.name>flink-cmd</interpreter.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-shell</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreter.java
similarity index 68%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java
rename to flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreter.java
index 4558b80..e792c57 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java
+++ b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreter.java
@@ -16,12 +16,8 @@
*/
-package org.apache.zeppelin.flink;
+package org.apache.zeppelin.flink.cmd;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@@ -31,7 +27,6 @@ import org.apache.zeppelin.shell.ShellInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
import java.util.Properties;
public class FlinkCmdInterpreter extends ShellInterpreter {
@@ -81,35 +76,11 @@ public class FlinkCmdInterpreter extends ShellInterpreter {
}
if (text.contains("Submitted application")) {
// yarn mode, extract yarn proxy url as flink ui link
- buildFlinkUIInfo(text, context);
+ YarnUtils.buildFlinkUIInfo(text, context);
isFlinkUrlSent = true;
}
}
- private void buildFlinkUIInfo(String log, InterpreterContext context) {
- int pos = log.lastIndexOf(" ");
- if (pos != -1) {
- String appId = log.substring(pos + 1);
- try {
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(new YarnConfiguration());
- yarnClient.start();
-
- ApplicationReport applicationReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
- Map<String, String> infos = new java.util.HashMap<String, String>();
- infos.put("jobUrl", applicationReport.getTrackingUrl());
- infos.put("label", "Flink UI");
- infos.put("tooltip", "View in Flink web UI");
- infos.put("noteId", context.getNoteId());
- infos.put("paraId", context.getParagraphId());
- context.getIntpEventClient().onParaInfosReceived(infos);
- } catch (Exception e) {
- LOGGER.error("Fail to extract flink url", e);
- }
- } else {
- LOGGER.error("Unable to extract flink url from this log: " + log);
- }
- }
@Override
public void onUpdate(int index, InterpreterResultMessageOutput out) {
diff --git a/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/YarnUtils.java b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/YarnUtils.java
new file mode 100644
index 0000000..87073f3
--- /dev/null
+++ b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/YarnUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.cmd;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class YarnUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtils.class);
+
+ public static void buildFlinkUIInfo(String log, InterpreterContext context) {
+ int pos = log.lastIndexOf(" ");
+ if (pos != -1) {
+ String appId = log.substring(pos + 1);
+ try {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(new YarnConfiguration());
+ yarnClient.start();
+
+ ApplicationReport applicationReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
+ Map<String, String> infos = new java.util.HashMap<String, String>();
+ infos.put("jobUrl", applicationReport.getTrackingUrl());
+ infos.put("label", "Flink UI");
+ infos.put("tooltip", "View in Flink web UI");
+ infos.put("noteId", context.getNoteId());
+ infos.put("paraId", context.getParagraphId());
+ context.getIntpEventClient().onParaInfosReceived(infos);
+ } catch (Exception e) {
+ LOGGER.error("Fail to extract flink url", e);
+ }
+ } else {
+ LOGGER.error("Unable to extract flink url from this log: " + log);
+ }
+ }
+}
diff --git a/flink-cmd/src/main/resources/interpreter-setting.json b/flink-cmd/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..0a13462
--- /dev/null
+++ b/flink-cmd/src/main/resources/interpreter-setting.json
@@ -0,0 +1,21 @@
+[
+ {
+ "group": "flink-cmd",
+ "name": "cmd",
+ "className": "org.apache.zeppelin.flink.cmd.FlinkCmdInterpreter",
+ "properties": {
+ "FLINK_HOME": {
+ "envName": null,
+ "propertyName": null,
+ "defaultValue": "",
+ "description": "Location of flink distribution",
+ "type": "string"
+ }
+ },
+ "editor": {
+ "language": "sh",
+ "editOnDblClick": false,
+ "completionSupport": false
+ }
+ }
+]
\ No newline at end of file
diff --git a/flink-cmd/src/test/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreterTest.java b/flink-cmd/src/test/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreterTest.java
new file mode 100644
index 0000000..0ca9c52
--- /dev/null
+++ b/flink-cmd/src/test/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreterTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.cmd;
+
+import org.junit.Test;
+
+public class FlinkCmdInterpreterTest {
+
+ @Test
+ public void test() {
+
+ }
+}
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index c55de22..2807552 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -83,22 +83,6 @@
<dependency>
<groupId>org.apache.zeppelin</groupId>
- <artifactId>zeppelin-shell</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId> org.eclipse.jetty</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-python</artifactId>
<version>${project.version}</version>
<exclusions>
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json b/flink/interpreter/src/main/resources/interpreter-setting.json
index 3ce1920..0a87ddf 100644
--- a/flink/interpreter/src/main/resources/interpreter-setting.json
+++ b/flink/interpreter/src/main/resources/interpreter-setting.json
@@ -276,17 +276,5 @@
"completionKey": "TAB",
"completionSupport": true
}
- },
- {
- "group": "flink",
- "name": "cmd",
- "className": "org.apache.zeppelin.flink.FlinkCmdInterpreter",
- "properties": {
- },
- "editor": {
- "language": "sh",
- "editOnDblClick": false,
- "completionSupport": false
- }
}
]
diff --git a/pom.xml b/pom.xml
index 9552016..2972ee2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<module>jdbc</module>
<module>file</module>
<module>flink</module>
+ <module>flink-cmd</module>
<module>ignite</module>
<module>influxdb</module>
<module>kylin</module>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 68ee2fa..4bd969b 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
@@ -111,9 +112,16 @@ public abstract class FlinkIntegrationTest {
InterpreterSetting flinkInterpreterSetting = interpreterSettingManager.getByName("flink");
assertEquals(1, flinkInterpreterSetting.getAllInterpreterGroups().size());
assertNotNull(flinkInterpreterSetting.getAllInterpreterGroups().get(0).getWebUrl());
+ }
+
+ @Test
+ public void testFlinkCmd() throws InterpreterException {
+ InterpreterSetting flinkCmdInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("flink-cmd");
+ flinkCmdInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
- Interpreter flinkShellInterpreter = interpreterFactory.getInterpreter("flink.cmd", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("flink").createExecutionContext());
- interpreterResult = flinkShellInterpreter.interpret("info -c org.apache.flink.streaming.examples.wordcount.WordCount " + flinkHome + "/examples/streaming/WordCount.jar", context);
+ Interpreter flinkCmdInterpreter = interpreterFactory.getInterpreter("flink-cmd", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("flink").createExecutionContext());
+ InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+ InterpreterResult interpreterResult = flinkCmdInterpreter.interpret("info -c org.apache.flink.streaming.examples.wordcount.WordCount " + flinkHome + "/examples/streaming/WordCount.jar", context);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
}