You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/10 17:39:10 UTC
zeppelin git commit: [ZEPPELIN-1461] Update Flink with latest version
1.1.2
Repository: zeppelin
Updated Branches:
refs/heads/master d005c7967 -> 3a1568efc
[ZEPPELIN-1461] Update Flink with latest version 1.1.2
### What is this PR for?
Flink has had two releases since 1.0.3, we are now on 1.1.2
This includes new functionality for streaming support in repl environment.
### What type of PR is it?
Improvement
### Todos
* [x] - Update `pom.xml`
* [x] - Update single (batch) environment to batch and streaming environments
* [x] - Update Test to reflect `benv` (instead of `env`)
### What is the Jira issue?
[https://issues.apache.org/jira/browse/ZEPPELIN-1416?filter=-1](https://issues.apache.org/jira/browse/ZEPPELIN-1416?filter=-1)
### How should this be tested?
Tests for previous versions are the same as new version.
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
No
* Is there breaking changes for older versions?
Yes* older code written in the Flink interpreter will now have to use `benv` in place of `env`
* Does this needs documentation?
No
Author: rawkintrevo <tr...@gmail.com>
Closes #1409 from rawkintrevo/zeppelin-1461 and squashes the following commits:
78502f0 [rawkintrevo] [ZEPPELIN-1461] Retrigger build
9b2d122 [rawkintrevo] [ZEPPELIN-1461] Update Test and remove unneeded code
9921a7e [rawkintrevo] [ZEPPELIN-1461] Update Flink with latest version 1.1.2
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/3a1568ef
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/3a1568ef
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/3a1568ef
Branch: refs/heads/master
Commit: 3a1568efc9a31394292bc359bb9bf42e1eb0f9f9
Parents: d005c79
Author: rawkintrevo <tr...@gmail.com>
Authored: Thu Sep 8 22:56:27 2016 -0500
Committer: Lee moon soo <mo...@apache.org>
Committed: Sat Sep 10 10:38:57 2016 -0700
----------------------------------------------------------------------
flink/pom.xml | 2 +-
.../apache/zeppelin/flink/FlinkInterpreter.java | 38 ++++++++++++++------
.../zeppelin/flink/FlinkInterpreterTest.java | 2 +-
3 files changed, 30 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
index 628f542..1686d06 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -34,7 +34,7 @@
<description>Zeppelin flink support</description>
<properties>
- <flink.version>1.0.3</flink.version>
+ <flink.version>1.1.2</flink.version>
<flink.akka.version>2.3.7</flink.akka.version>
<scala.macros.version>2.0.1</scala.macros.version>
</properties>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index d3229cf..5ce3b85 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -30,6 +30,7 @@ import java.util.*;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
import scala.Console;
import scala.None;
+import scala.Option;
import scala.Some;
import scala.collection.JavaConversions;
import scala.collection.immutable.Nil;
@@ -83,14 +85,25 @@ public class FlinkInterpreter extends Interpreter {
startFlinkMiniCluster();
}
- flinkIloop = new FlinkILoop(getHost(), getPort(), (BufferedReader) null, new PrintWriter(out));
+ flinkIloop = new FlinkILoop(getHost(),
+ getPort(),
+ flinkConf,
+ (BufferedReader) null,
+ new PrintWriter(out));
+
flinkIloop.settings_$eq(createSettings());
flinkIloop.createInterpreter();
-
+
imain = flinkIloop.intp();
- org.apache.flink.api.scala.ExecutionEnvironment env = flinkIloop.scalaEnv();
- env.getConfig().disableSysoutLogging();
+ org.apache.flink.api.scala.ExecutionEnvironment benv =
+ flinkIloop.scalaBenv();
+ //new ExecutionEnvironment(remoteBenv)
+ org.apache.flink.streaming.api.scala.StreamExecutionEnvironment senv =
+ flinkIloop.scalaSenv();
+
+ senv.getConfig().disableSysoutLogging();
+ benv.getConfig().disableSysoutLogging();
// prepare bindings
imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
@@ -100,13 +113,19 @@ public class FlinkInterpreter extends Interpreter {
imain.interpret("import scala.tools.nsc.io._");
imain.interpret("import Properties.userHome");
imain.interpret("import scala.compat.Platform.EOL");
-
+
imain.interpret("import org.apache.flink.api.scala._");
imain.interpret("import org.apache.flink.api.common.functions._");
- binder.put("env", env);
- imain.interpret("val env = _binder.get(\"env\").asInstanceOf["
- + env.getClass().getName() + "]");
+
+ binder.put("benv", benv);
+ imain.interpret("val benv = _binder.get(\"benv\").asInstanceOf["
+ + benv.getClass().getName() + "]");
+
+ binder.put("senv", senv);
+ imain.interpret("val senv = _binder.get(\"senv\").asInstanceOf["
+ + senv.getClass().getName() + "]");
+
}
private boolean localMode() {
@@ -313,8 +332,6 @@ public class FlinkInterpreter extends Interpreter {
}
}
-
-
@Override
public void cancel(InterpreterContext context) {
}
@@ -354,4 +371,5 @@ public class FlinkInterpreter extends Interpreter {
static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3a1568ef/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index b6f9db6..1d8f437 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -81,7 +81,7 @@ public class FlinkInterpreterTest {
@Test
public void testWordCount() {
- flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
+ flink.interpret("val text = benv.fromElements(\"To be or not to be\")", context);
flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
InterpreterResult result = flink.interpret("counts.print()", context);
assertEquals(Code.SUCCESS, result.code());