You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/02/05 05:33:07 UTC

[zeppelin] branch master updated: [ZEPPELIN-5217]. Move %flink.cmd as a separated interpreter module

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cebdbf  [ZEPPELIN-5217]. Move %flink.cmd as a separated interpreter module
4cebdbf is described below

commit 4cebdbf5aa2d04d37f6c729d22bd63f16f9a4694
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Jan 22 16:18:55 2021 +0800

    [ZEPPELIN-5217]. Move %flink.cmd as a separated interpreter module
    
    ### What is this PR for?
    
    Similar as ZEPPELIN-5213, just move %flink.cmd as separated interpreter module.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5217
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4038 from zjffdu/ZEPPELIN-5217 and squashes the following commits:
    
    5e87345de [Jeff Zhang] [ZEPPELIN-5217]. Move %flink.cmd as a separated interpreter module
---
 .github/workflows/core.yml                         | 10 +--
 flink-cmd/pom.xml                                  | 82 ++++++++++++++++++++++
 .../zeppelin/flink/cmd}/FlinkCmdInterpreter.java   | 33 +--------
 .../org/apache/zeppelin/flink/cmd/YarnUtils.java   | 58 +++++++++++++++
 .../src/main/resources/interpreter-setting.json    | 21 ++++++
 .../flink/cmd/FlinkCmdInterpreterTest.java         | 28 ++++++++
 flink/interpreter/pom.xml                          | 16 -----
 .../src/main/resources/interpreter-setting.json    | 12 ----
 pom.xml                                            |  1 +
 .../zeppelin/integration/FlinkIntegrationTest.java | 12 +++-
 10 files changed, 207 insertions(+), 66 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index c0e7c21..5d5871e 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -66,7 +66,7 @@ jobs:
   test-interpreter-modules:
     runs-on: ubuntu-18.04
     env:
-      INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink,ignite,kylin,lens,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb'
+      INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink,flink-cmd,ignite,kylin,lens,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb'
     steps:
       - name: Checkout
         uses: actions/checkout@v2
@@ -125,10 +125,10 @@ jobs:
           R -e "IRkernel::installspec()"
       - name: install environment
         run: |
-          mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/spark-dependencies,markdown,flink/interpreter,jdbc,shell -am
+          mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark-submit,spark/spark-dependencies,markdown,flink-cmd,flink/interpreter,jdbc,shell -am
           mvn package -DskipRat -pl zeppelin-plugins -amd -DskipTests -B
       - name: run tests
-        run: mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
+        run: mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -DfailIfNoTests=false -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest
   test-flink-and-flink-integration-test:
     runs-on: ubuntu-18.04
     strategy:
@@ -160,10 +160,10 @@ jobs:
           auto-activate-base: false
       - name: install environment
         run: |
-          mvn install -DskipTests -DskipRat -am -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
+          mvn install -DskipTests -DskipRat -am -pl flink/interpreter,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B
           mvn clean package -pl zeppelin-plugins -amd -DskipTests -B
       - name: run tests
-        run: mvn test -DskipRat -pl flink/interpreter,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
+        run: mvn test -DskipRat -pl flink/interpreter,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }},ZeppelinFlinkClusterTest${{ matrix.flink }}
   run-spark-intergration-test:
     runs-on: ubuntu-18.04
     steps:
diff --git a/flink-cmd/pom.xml b/flink-cmd/pom.xml
new file mode 100644
index 0000000..fef06da
--- /dev/null
+++ b/flink-cmd/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin-interpreter-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-flink-cmd</artifactId>
+  <packaging>jar</packaging>
+  <version>0.10.0-SNAPSHOT</version>
+  <name>Zeppelin: Flink-Cmd interpreter</name>
+
+  <properties>
+    <interpreter.name>flink-cmd</interpreter.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-shell</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreter.java
similarity index 68%
rename from flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java
rename to flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreter.java
index 4558b80..e792c57 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkCmdInterpreter.java
+++ b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreter.java
@@ -16,12 +16,8 @@
  */
 
 
-package org.apache.zeppelin.flink;
+package org.apache.zeppelin.flink.cmd;
 
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@@ -31,7 +27,6 @@ import org.apache.zeppelin.shell.ShellInterpreter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.Properties;
 
 public class FlinkCmdInterpreter extends ShellInterpreter {
@@ -81,35 +76,11 @@ public class FlinkCmdInterpreter extends ShellInterpreter {
       }
       if (text.contains("Submitted application")) {
         // yarn mode, extract yarn proxy url as flink ui link
-        buildFlinkUIInfo(text, context);
+        YarnUtils.buildFlinkUIInfo(text, context);
         isFlinkUrlSent = true;
       }
     }
 
-    private void buildFlinkUIInfo(String log, InterpreterContext context) {
-      int pos = log.lastIndexOf(" ");
-      if (pos != -1) {
-        String appId = log.substring(pos + 1);
-        try {
-          YarnClient yarnClient = YarnClient.createYarnClient();
-          yarnClient.init(new YarnConfiguration());
-          yarnClient.start();
-
-          ApplicationReport applicationReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
-          Map<String, String> infos = new java.util.HashMap<String, String>();
-          infos.put("jobUrl", applicationReport.getTrackingUrl());
-          infos.put("label", "Flink UI");
-          infos.put("tooltip", "View in Flink web UI");
-          infos.put("noteId", context.getNoteId());
-          infos.put("paraId", context.getParagraphId());
-          context.getIntpEventClient().onParaInfosReceived(infos);
-        } catch (Exception e) {
-          LOGGER.error("Fail to extract flink url", e);
-        }
-      } else {
-        LOGGER.error("Unable to extract flink url from this log: " + log);
-      }
-    }
 
     @Override
     public void onUpdate(int index, InterpreterResultMessageOutput out) {
diff --git a/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/YarnUtils.java b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/YarnUtils.java
new file mode 100644
index 0000000..87073f3
--- /dev/null
+++ b/flink-cmd/src/main/java/org/apache/zeppelin/flink/cmd/YarnUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.cmd;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class YarnUtils {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtils.class);
+
+  public static void buildFlinkUIInfo(String log, InterpreterContext context) {
+    int pos = log.lastIndexOf(" ");
+    if (pos != -1) {
+      String appId = log.substring(pos + 1);
+      try {
+        YarnClient yarnClient = YarnClient.createYarnClient();
+        yarnClient.init(new YarnConfiguration());
+        yarnClient.start();
+
+        ApplicationReport applicationReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
+        Map<String, String> infos = new java.util.HashMap<String, String>();
+        infos.put("jobUrl", applicationReport.getTrackingUrl());
+        infos.put("label", "Flink UI");
+        infos.put("tooltip", "View in Flink web UI");
+        infos.put("noteId", context.getNoteId());
+        infos.put("paraId", context.getParagraphId());
+        context.getIntpEventClient().onParaInfosReceived(infos);
+      } catch (Exception e) {
+        LOGGER.error("Fail to extract flink url", e);
+      }
+    } else {
+      LOGGER.error("Unable to extract flink url from this log: " + log);
+    }
+  }
+}
diff --git a/flink-cmd/src/main/resources/interpreter-setting.json b/flink-cmd/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..0a13462
--- /dev/null
+++ b/flink-cmd/src/main/resources/interpreter-setting.json
@@ -0,0 +1,21 @@
+[
+  {
+    "group": "flink-cmd",
+    "name": "cmd",
+    "className": "org.apache.zeppelin.flink.cmd.FlinkCmdInterpreter",
+    "properties": {
+      "FLINK_HOME": {
+        "envName": null,
+        "propertyName": null,
+        "defaultValue": "",
+        "description": "Location of flink distribution",
+        "type": "string"
+      }
+    },
+    "editor": {
+      "language": "sh",
+      "editOnDblClick": false,
+      "completionSupport": false
+    }
+  }
+]
\ No newline at end of file
diff --git a/flink-cmd/src/test/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreterTest.java b/flink-cmd/src/test/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreterTest.java
new file mode 100644
index 0000000..0ca9c52
--- /dev/null
+++ b/flink-cmd/src/test/java/org/apache/zeppelin/flink/cmd/FlinkCmdInterpreterTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.cmd;
+
+import org.junit.Test;
+
+public class FlinkCmdInterpreterTest {
+
+  @Test
+  public void test() {
+
+  }
+}
diff --git a/flink/interpreter/pom.xml b/flink/interpreter/pom.xml
index c55de22..2807552 100644
--- a/flink/interpreter/pom.xml
+++ b/flink/interpreter/pom.xml
@@ -83,22 +83,6 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
-      <artifactId>zeppelin-shell</artifactId>
-      <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId> org.eclipse.jetty</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
       <exclusions>
diff --git a/flink/interpreter/src/main/resources/interpreter-setting.json b/flink/interpreter/src/main/resources/interpreter-setting.json
index 3ce1920..0a87ddf 100644
--- a/flink/interpreter/src/main/resources/interpreter-setting.json
+++ b/flink/interpreter/src/main/resources/interpreter-setting.json
@@ -276,17 +276,5 @@
       "completionKey": "TAB",
       "completionSupport": true
     }
-  },
-  {
-    "group": "flink",
-    "name": "cmd",
-    "className": "org.apache.zeppelin.flink.FlinkCmdInterpreter",
-    "properties": {
-    },
-    "editor": {
-      "language": "sh",
-      "editOnDblClick": false,
-      "completionSupport": false
-    }
   }
 ]
diff --git a/pom.xml b/pom.xml
index 9552016..2972ee2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
     <module>jdbc</module>
     <module>file</module>
     <module>flink</module>
+    <module>flink-cmd</module>
     <module>ignite</module>
     <module>influxdb</module>
     <module>kylin</module>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 68ee2fa..4bd969b 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
@@ -111,9 +112,16 @@ public abstract class FlinkIntegrationTest {
     InterpreterSetting flinkInterpreterSetting = interpreterSettingManager.getByName("flink");
     assertEquals(1, flinkInterpreterSetting.getAllInterpreterGroups().size());
     assertNotNull(flinkInterpreterSetting.getAllInterpreterGroups().get(0).getWebUrl());
+  }
+
+  @Test
+  public void testFlinkCmd() throws InterpreterException {
+    InterpreterSetting flinkCmdInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("flink-cmd");
+    flinkCmdInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
 
-    Interpreter flinkShellInterpreter = interpreterFactory.getInterpreter("flink.cmd", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("flink").createExecutionContext());
-    interpreterResult = flinkShellInterpreter.interpret("info -c org.apache.flink.streaming.examples.wordcount.WordCount " + flinkHome + "/examples/streaming/WordCount.jar", context);
+    Interpreter flinkCmdInterpreter = interpreterFactory.getInterpreter("flink-cmd", new ExecutionContextBuilder().setUser("user1").setNoteId("note1").setDefaultInterpreterGroup("flink").createExecutionContext());
+    InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+    InterpreterResult interpreterResult = flinkCmdInterpreter.interpret("info -c org.apache.flink.streaming.examples.wordcount.WordCount " + flinkHome + "/examples/streaming/WordCount.jar", context);
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
   }