You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/10 17:39:10 UTC

zeppelin git commit: [ZEPPELIN-1461] Update Flink with latest version 1.1.2

Repository: zeppelin
Updated Branches:
  refs/heads/master d005c7967 -> 3a1568efc


[ZEPPELIN-1461] Update Flink with latest version 1.1.2

### What is this PR for?
Flink has had two releases since 1.0.3, we are now on 1.1.2

This includes new functionality for streaming support in repl environment.

### What type of PR is it?
 Improvement

### Todos
* [x] - Update `pom.xml`
* [x] - Update single (batch) environment to batch and streaming environments
* [x] - Update Test to reflect `benv` (instead of `env`)

### What is the Jira issue?
[https://issues.apache.org/jira/browse/ZEPPELIN-1416?filter=-1](https://issues.apache.org/jira/browse/ZEPPELIN-1416?filter=-1)

### How should this be tested?
Tests for previous versions are the same as new version.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update?
No
* Is there breaking changes for older versions?
Yes* older code written in the Flink interpreter will now have to use `benv` in place of `env`
* Does this needs documentation?
No

Author: rawkintrevo <tr...@gmail.com>

Closes #1409 from rawkintrevo/zeppelin-1461 and squashes the following commits:

78502f0 [rawkintrevo] [ZEPPELIN-1461] Retrigger build
9b2d122 [rawkintrevo] [ZEPPELIN-1461] Update Test and remove unneeded code
9921a7e [rawkintrevo] [ZEPPELIN-1461] Update Flink with latest version 1.1.2


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/3a1568ef
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/3a1568ef
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/3a1568ef

Branch: refs/heads/master
Commit: 3a1568efc9a31394292bc359bb9bf42e1eb0f9f9
Parents: d005c79
Author: rawkintrevo <tr...@gmail.com>
Authored: Thu Sep 8 22:56:27 2016 -0500
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat Sep 10 10:38:57 2016 -0700

----------------------------------------------------------------------
 flink/pom.xml                                   |  2 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java | 38 ++++++++++++++------
 .../zeppelin/flink/FlinkInterpreterTest.java    |  2 +-
 3 files changed, 30 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 628f542..1686d06 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -34,7 +34,7 @@
   <description>Zeppelin flink support</description>
 
   <properties>
-    <flink.version>1.0.3</flink.version>
+    <flink.version>1.1.2</flink.version>
     <flink.akka.version>2.3.7</flink.akka.version>
     <scala.macros.version>2.0.1</scala.macros.version>
   </properties>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index d3229cf..5ce3b85 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -30,6 +30,7 @@ import java.util.*;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import scala.Console;
 import scala.None;
+import scala.Option;
 import scala.Some;
 import scala.collection.JavaConversions;
 import scala.collection.immutable.Nil;
@@ -83,14 +85,25 @@ public class FlinkInterpreter extends Interpreter {
       startFlinkMiniCluster();
     }
 
-    flinkIloop = new FlinkILoop(getHost(), getPort(), (BufferedReader) null, new PrintWriter(out));
+    flinkIloop = new FlinkILoop(getHost(),
+                                getPort(),
+                                flinkConf,
+                                (BufferedReader) null,
+                                new PrintWriter(out));
+
     flinkIloop.settings_$eq(createSettings());
     flinkIloop.createInterpreter();
-    
+
     imain = flinkIloop.intp();
 
-    org.apache.flink.api.scala.ExecutionEnvironment env = flinkIloop.scalaEnv();
-    env.getConfig().disableSysoutLogging();
+    org.apache.flink.api.scala.ExecutionEnvironment benv =
+            flinkIloop.scalaBenv();
+            //new ExecutionEnvironment(remoteBenv)
+    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv =
+            flinkIloop.scalaSenv();
+
+    senv.getConfig().disableSysoutLogging();
+    benv.getConfig().disableSysoutLogging();
 
     // prepare bindings
     imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@@ -100,13 +113,19 @@ public class FlinkInterpreter extends Interpreter {
     imain.interpret("import scala.tools.nsc.io._");
     imain.interpret("import Properties.userHome");
     imain.interpret("import scala.compat.Platform.EOL");
-    
+
     imain.interpret("import org.apache.flink.api.scala._");
     imain.interpret("import org.apache.flink.api.common.functions._");
 
-    binder.put("env", env);
-    imain.interpret("val env = _binder.get(\"env\").asInstanceOf["
-        + env.getClass().getName() + "]");
+
+    binder.put("benv", benv);
+    imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf["
+            + benv.getClass().getName() + "]");
+
+    binder.put("senv", senv);
+    imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf["
+            + senv.getClass().getName() + "]");
+
   }
 
   private boolean localMode() {
@@ -313,8 +332,6 @@ public class FlinkInterpreter extends Interpreter {
     }
   }
 
-
-
   @Override
   public void cancel(InterpreterContext context) {
   }
@@ -354,4 +371,5 @@ public class FlinkInterpreter extends Interpreter {
   static final String toString(Object o) {
     return (o instanceof String) ? (String) o : "";
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index b6f9db6..1d8f437 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -81,7 +81,7 @@ public class FlinkInterpreterTest {
 
   @Test
   public void testWordCount() {
-    flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
+    flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context);
     flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
     InterpreterResult result = flink.interpret("counts.print()", context);
     assertEquals(Code.SUCCESS, result.code());