You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by GitBox <gi...@apache.org> on 2019/01/01 01:22:44 UTC

[GitHub] felixcheung closed pull request #3266: [ZEPPELIN-3914] upgrade Flink to 1.7.0

felixcheung closed pull request #3266: [ZEPPELIN-3914] upgrade Flink to 1.7.0
URL: https://github.com/apache/zeppelin/pull/3266
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index 2cf31257ad..d3f2223432 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -50,7 +50,7 @@ At the "Interpreters" menu, you have to create a new Flink interpreter and provi
   </tr>
 </table>
 
-For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html).
+For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html).
 
 ## How to test it's working
 You can find an example of Flink usage in the Zeppelin Tutorial folder or try the following word count example, by using the [Zeppelin notebook](https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL05GTGFicy96ZXBwZWxpbi1ub3RlYm9va3MvbWFzdGVyL25vdGVib29rcy8yQVFFREs1UEMvbm90ZS5qc29u) from Till Rohrmann's presentation [Interactive data analysis with Apache Flink](http://www.slideshare.net/tillrohrmann/data-analysis-49806564) for Apache Flink Meetup.
diff --git a/flink/pom.xml b/flink/pom.xml
index 7a374f25e7..331e19cef4 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -36,7 +36,7 @@
   <properties>
     <!--library versions-->
     <interpreter.name>flink</interpreter.name>
-    <flink.version>1.5.2</flink.version>
+    <flink.version>1.7.1</flink.version>
     <flink.akka.version>2.3.7</flink.akka.version>
     <scala.macros.version>2.0.1</scala.macros.version>
     <scala.binary.version>2.11</scala.binary.version>
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
index 1694a4491c..b2d8d1679c 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala
@@ -30,7 +30,7 @@ class FlinkSQLScalaInterpreter(scalaInterpreter: FlinkScalaInterpreter,
 
   def interpret(code: String, context: InterpreterContext): InterpreterResult = {
     try {
-      val table: Table = this.btenv.sql(code)
+      val table: Table = this.btenv.sqlQuery(code)
       val result = z.showData(table)
       return new InterpreterResult(InterpreterResult.Code.SUCCESS, result)
     } catch {
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 14f895962b..1d8b27e4a7 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.scala.FlinkShell._
 import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop}
 import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.flink.runtime.minicluster.{MiniCluster, StandaloneMiniCluster}
+import org.apache.flink.runtime.minicluster.MiniCluster
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
@@ -45,8 +45,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
   private var flinkILoop: FlinkILoop = _
-  private var cluster: Option[Either[Either[StandaloneMiniCluster, MiniCluster],
-    ClusterClient[_]]] = _
+  private var cluster: Option[Either[MiniCluster, ClusterClient[_]]] = _
   private var scalaCompleter: ScalaCompleter = _
   private val interpreterOutput = new InterpreterOutputStream(LOGGER)
 
@@ -68,8 +67,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val (iLoop, cluster) = try {
       val (host, port, cluster) = fetchConnectionInfo(configuration, config)
       val conf = cluster match {
-        case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
-        case Some(Left(Right(_))) => configuration
+        case Some(Left(miniCluster)) => configuration
         case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
         case None => configuration
       }
@@ -213,10 +211,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
     if (cluster != null) {
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) =>
-          LOGGER.info("Shutdown LegacyMiniCluster")
-          legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) =>
+        case Some(Left(newMiniCluster)) =>
           LOGGER.info("Shutdown NewMiniCluster")
           newMiniCluster.close()
         case Some(Right(yarnCluster)) =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services