You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by XiangWei Huang <xw...@gmail.com> on 2017/09/13 04:21:11 UTC
got Warn message - "the expected leader session ID did not equal the
received leader session ID " when using LocalFlinkMiniCluster to interpret
scala code
dear all,
Below is the code i execute:
import java.io._
import java.net.{URL, URLClassLoader}
import java.nio.charset.Charset
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean
import com.netease.atom.common.util.logging.Logging
import com.netease.atom.interpreter.Code.Code
import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
import io.netty.buffer._
import org.apache.flink.api.scala.FlinkILoop
import org.apache.flink.client.CliFrontend
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}
import scala.Console
import scala.beans.BeanProperty
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.runtime.AbstractFunction0
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
class FlinkInterpreter extends Interpreter {
private var bufferedReader: Option[BufferedReader] = None
private var jprintWriter: JPrintWriter = _
private val config = new Configuration;
private var cluster: LocalFlinkMiniCluster = _
@BeanProperty var imain: IMain = _
@BeanProperty var flinkILoop: FlinkILoop = _
private var out: ByteBufOutputStream = null
private var outBuf: ByteBuf = null
private var in: ByteBufInputStream = _
private var isRunning: AtomicBoolean = new AtomicBoolean(false)
override def isOpen: Boolean = {
isRunning.get()
}
def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
config.toMap.toMap.foreach(println)
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val localCluster = new LocalFlinkMiniCluster(config, false)
localCluster.start(true)
val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
("localhost", localCluster.getLeaderRPCPort, localCluster)
}
/**
* Start flink cluster and create interpreter
*/
override def open: Unit = {
outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
out = new ByteBufOutputStream(outBuf)
in = new ByteBufInputStream(outBuf)
// val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
val (host, port, localCluster) = startLocalMiniCluster()
this.cluster = localCluster
val conf = cluster.configuration
println(s"Connecting to Flink cluster (host:$host,port:$port)...")
flinkILoop = new FlinkILoop(host, port, conf, None)
val settings = new Settings()
settings.usejavacp.value = true
settings.Yreplsync.value = true
flinkILoop.settings_$eq(settings)
flinkILoop.createInterpreter()
imain = flinkILoop.intp
FlinkInterpreter.ourClassloader = imain.classLoader
val benv = flinkILoop.scalaBenv
val senv = flinkILoop.scalaSenv
benv.getConfig.disableSysoutLogging()
senv.getConfig.disableSysoutLogging()
// import libraries
imain.interpret("import scala.tools.nsc.io._")
// imain.interpret("import Properties.userHome")
imain.interpret("import scala.compat.Platform.EOL")
imain.interpret("import org.apache.flink.api.scala._")
imain.interpret("import org.apache.flink.api.common.functions._")
isRunning.set(true)
}
override def interpret(line: String): InterpreterResult = {
if (line == null || line.trim.length == 0) {
return new InterpreterResult(Code.SUCCESS)
}
interpret(line.split("\n"))
}
/**
* Interprete code
* @param lines
* @return
*/
def interpret(lines: Array[String]): InterpreterResult = {
val imain: IMain = getImain
val linesToRun: Array[String] = new Array[String](lines.length + 1)
for (i <- 0 until lines.length) {
linesToRun(i) = lines(i)
}
linesToRun(lines.length) = "print(\"\")"
System.setOut(new PrintStream(out))
out.buffer().clear()
var r: Code = null
var incomplete: String = ""
var inComment: Boolean = false
for (l <- 0 until linesToRun.length) {
val s: String = linesToRun(l)
var continuation: Boolean = false
if (l + 1 < linesToRun.length) {
val nextLine: String = linesToRun(l + 1).trim
if (nextLine.isEmpty ||
nextLine.startsWith("//") ||
nextLine.startsWith("}") ||
nextLine.startsWith("object")) {
continuation = true
} else if (!inComment && nextLine.startsWith("/*")) {
inComment = true
continuation = true
} else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
inComment = false
continuation = true
} else if (nextLine.length > 1 &&
nextLine.charAt(0) == '.' &&
nextLine.charAt(1) != '.' &&
nextLine.charAt(1) != '/') {
continuation = true
} else if (inComment) {
continuation = true
}
if (continuation) {
incomplete += s + "\n"
}
}
if (!continuation) {
val currentCommand: String = incomplete
var res: Results.Result = null
try {
res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
override def apply() = {
imain.interpret(currentCommand + s)
}
}.apply())
} catch {
case e: Exception =>
logError("Interpreter Exception ", e)
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
}
r = getResultCode(res)
if (r == Code.ERROR) {
return new InterpreterResult(r, out.toString)
} else if (r eq Code.INCOMPLETE) {
incomplete += s + "\n"
} else {
incomplete = ""
}
}
}
if (r eq Code.INCOMPLETE) {
return new InterpreterResult(r, "Incomplete expression")
}
else {
return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))
}
}
private def getResultCode(r: Results.Result): Code = {
if (r.isInstanceOf[Results.Success.type]) {
return Code.SUCCESS
}
else if (r.isInstanceOf[Results.Incomplete.type]) {
return Code.INCOMPLETE
}
else {
return Code.ERROR
}
}
}
}
object FlinkInterpreter extends Logging {
var ourClassloader: ClassLoader = _
def main(args: Array[String]): Unit = {
val interpreter: FlinkInterpreter = new FlinkInterpreter
val code =
"""
|val dataStream = senv.fromElements(1,2,3,4,5)
|dataStream.countWindowAll(2).sum(0).print()
|senv.execute("My streaming program")
""".stripMargin
interpreter.open
val result = interpreter.interpret(code)
}
}
The error messages i got are:
…
…
...
[WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
[INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor.
[INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940].
[INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon.
[INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
... 34 elided
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
... 41 more
Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think you can make it start the Web Frontend via
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
In the future, this will become moot, though, when the JobManager has a proper REST API that is always there.
Best,
Aljoscha
> On 27. Sep 2017, at 11:40, XiangWei Huang <xw...@gmail.com> wrote:
>
> Hi Till,
>
> I’ve found that a StandaloneMiniCluster doesn’t startup web fronted when it is running.so,how can i cancel a running job on it with restful method.
>
> Cheers,
> Till
>
>> 在 2017年9月20日,15:43,Till Rohrmann <tr...@apache.org> 写道:
>>
>> Hi XiangWei,
>>
>> programmatically there is no nice tooling yet to cancel jobs on a dedicated cluster. What you can do is to use Flink's REST API to issue a cancel command [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`. In the future we will improve the programmatic job control which will allow you to do these kind of things more easily.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw...@gmail.com> wrote:
>> Hi Till,
>>
>> Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can’t find a way to cancel
>> a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code :
>>
>> for (job <- cluster.getCurrentlyRunningJobsJava()) {
>> cluster.stopJob(job)
>> }
>>
>> Is it possible to cancel a running Flink job without shutting down a StandaloneMiniCluster ?
>>
>> Best Regards,
>> XiangWei
>>
>>
>>
>>> 在 2017年9月14日,16:58,Till Rohrmann <tr...@apache.org> 写道:
>>>
>>> Hi XiangWei,
>>>
>>> the problem is that the LocalFlinkMiniCluster can no longer be used in combination with a RemoteExecutionEnvironment. The reason is that the LocalFlinkMiniCluster uses now an internal leader election service and assigns leader ids to its components. Since this is an internal service it is not possible to retrieve this information like it is the case with the ZooKeeper based leader election services.
>>>
>>> Long story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be fixed to work properly together with a local execution environment. Until then, I recommend starting a local standalone cluster and let the code run there.
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw...@gmail.com> wrote:
>>> dear all,
>>>
>>> Below is the code i execute:
>>>
>>> import java.io._
>>> import java.net.{URL, URLClassLoader}
>>> import java.nio.charset.Charset
>>> import java.util.Collections
>>> import java.util.concurrent.atomic.AtomicBoolean
>>>
>>> import com.netease.atom.common.util.logging.Logging
>>> import com.netease.atom.interpreter.Code.Code
>>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
>>> import io.netty.buffer._
>>> import org.apache.flink.api.scala.FlinkILoop
>>> import org.apache.flink.client.CliFrontend
>>> import org.apache.flink.client.cli.CliFrontendParser
>>> import org.apache.flink.client.program.ClusterClient
>>> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
>>> import org.apache.flink.runtime.akka.AkkaUtils
>>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}
>>>
>>> import scala.Console
>>> import scala.beans.BeanProperty
>>> import scala.collection.JavaConversions._
>>> import scala.collection.mutable
>>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>>> import scala.runtime.AbstractFunction0
>>> import scala.tools.nsc.Settings
>>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>>
>>> class FlinkInterpreter extends Interpreter {
>>> private var bufferedReader: Option[BufferedReader] = None
>>> private var jprintWriter: JPrintWriter = _
>>> private val config = new Configuration;
>>> private var cluster: LocalFlinkMiniCluster = _
>>> @BeanProperty var imain: IMain = _
>>> @BeanProperty var flinkILoop: FlinkILoop = _
>>> private var out: ByteBufOutputStream = null
>>> private var outBuf: ByteBuf = null
>>> private var in: ByteBufInputStream = _
>>> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>>
>>> override def isOpen: Boolean = {
>>> isRunning.get()
>>> }
>>>
>>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>>> config.toMap.toMap.foreach(println)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>>> val localCluster = new LocalFlinkMiniCluster(config, false)
>>> localCluster.start(true)
>>> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
>>> println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>>> }
>>>
>>>
>>> /**
>>> * Start flink cluster and create interpreter
>>> */
>>> override def open: Unit = {
>>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>>> out = new ByteBufOutputStream(outBuf)
>>> in = new ByteBufInputStream(outBuf)
>>> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
>>> val (host, port, localCluster) = startLocalMiniCluster()
>>> this.cluster = localCluster
>>> val conf = cluster.configuration
>>> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>>> flinkILoop = new FlinkILoop(host, port, conf, None)
>>> val settings = new Settings()
>>> settings.usejavacp.value = true
>>> settings.Yreplsync.value = true
>>> flinkILoop.settings_$eq(settings)
>>> flinkILoop.createInterpreter()
>>> imain = flinkILoop.intp
>>> FlinkInterpreter.ourClassloader = imain.classLoader
>>> val benv = flinkILoop.scalaBenv
>>> val senv = flinkILoop.scalaSenv
>>> benv.getConfig.disableSysoutLogging()
>>> senv.getConfig.disableSysoutLogging()
>>> // import libraries
>>> imain.interpret("import scala.tools.nsc.io._")
>>> // imain.interpret("import Properties.userHome")
>>> imain.interpret("import scala.compat.Platform.EOL")
>>> imain.interpret("import org.apache.flink.api.scala._")
>>> imain.interpret("import org.apache.flink.api.common.functions._")
>>> isRunning.set(true)
>>> }
>>>
>>> override def interpret(line: String): InterpreterResult = {
>>> if (line == null || line.trim.length == 0) {
>>> return new InterpreterResult(Code.SUCCESS)
>>> }
>>> interpret(line.split("\n"))
>>> }
>>>
>>> /**
>>> * Interprete code
>>> * @param lines
>>> * @return
>>> */
>>> def interpret(lines: Array[String]): InterpreterResult = {
>>> val imain: IMain = getImain
>>> val linesToRun: Array[String] = new Array[String](lines.length + 1)
>>> for (i <- 0 until lines.length) {
>>> linesToRun(i) = lines(i)
>>> }
>>> linesToRun(lines.length) = "print(\"\")"
>>> System.setOut(new PrintStream(out))
>>> out.buffer().clear()
>>> var r: Code = null
>>> var incomplete: String = ""
>>> var inComment: Boolean = false
>>> for (l <- 0 until linesToRun.length) {
>>> val s: String = linesToRun(l)
>>> var continuation: Boolean = false
>>> if (l + 1 < linesToRun.length) {
>>> val nextLine: String = linesToRun(l + 1).trim
>>> if (nextLine.isEmpty ||
>>> nextLine.startsWith("//") ||
>>> nextLine.startsWith("}") ||
>>> nextLine.startsWith("object")) {
>>> continuation = true
>>> } else if (!inComment && nextLine.startsWith("/*")) {
>>> inComment = true
>>> continuation = true
>>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>>> inComment = false
>>> continuation = true
>>> } else if (nextLine.length > 1 &&
>>> nextLine.charAt(0) == '.' &&
>>> nextLine.charAt(1) != '.' &&
>>> nextLine.charAt(1) != '/') {
>>> continuation = true
>>> } else if (inComment) {
>>> continuation = true
>>> }
>>> if (continuation) {
>>> incomplete += s + "\n"
>>> }
>>> }
>>> if (!continuation) {
>>> val currentCommand: String = incomplete
>>> var res: Results.Result = null
>>> try {
>>> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
>>> override def apply() = {
>>> imain.interpret(currentCommand + s)
>>> }
>>> }.apply())
>>> } catch {
>>> case e: Exception =>
>>> logError("Interpreter Exception ", e)
>>> return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
>>> }
>>> r = getResultCode(res)
>>> if (r == Code.ERROR) {
>>> return new InterpreterResult(r, out.toString)
>>> } else if (r eq Code.INCOMPLETE) {
>>> incomplete += s + "\n"
>>> } else {
>>> incomplete = ""
>>> }
>>> }
>>> }
>>>
>>> if (r eq Code.INCOMPLETE) {
>>> return new InterpreterResult(r, "Incomplete expression")
>>> }
>>> else {
>>> return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))
>>> }
>>> }
>>>
>>> private def getResultCode(r: Results.Result): Code = {
>>> if (r.isInstanceOf[Results.Success.type]) {
>>> return Code.SUCCESS
>>> }
>>> else if (r.isInstanceOf[Results.Incomplete.type]) {
>>> return Code.INCOMPLETE
>>> }
>>> else {
>>> return Code.ERROR
>>> }
>>> }
>>>
>>> }
>>> }
>>>
>>> object FlinkInterpreter extends Logging {
>>> var ourClassloader: ClassLoader = _
>>>
>>> def main(args: Array[String]): Unit = {
>>> val interpreter: FlinkInterpreter = new FlinkInterpreter
>>> val code =
>>> """
>>> |val dataStream = senv.fromElements(1,2,3,4,5)
>>> |dataStream.countWindowAll(2).sum(0).print()
>>> |senv.execute("My streaming program")
>>> """.stripMargin
>>> interpreter.open
>>> val result = interpreter.interpret(code)
>>> }
>>> }
>>>
>>> The error messages i got are:
>>> …
>>> …
>>> ...
>>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
>>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor.
>>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940].
>>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon.
>>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports.
>>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
>>> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>>> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
>>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
>>> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
>>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>>> ... 34 elided
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
>>> at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>>> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
>>> ... 41 more
>>> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
>>> at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>>> at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>>
>>>
>>
>>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think you can make it start the Web Frontend via
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
In the future, this will become moot, though, when the JobManager has a proper REST API that is always there.
Best,
Aljoscha
> On 27. Sep 2017, at 11:40, XiangWei Huang <xw...@gmail.com> wrote:
>
> Hi Till,
>
> I’ve found that a StandaloneMiniCluster doesn’t startup web fronted when it is running.so,how can i cancel a running job on it with restful method.
>
> Cheers,
> Till
>
>> 在 2017年9月20日,15:43,Till Rohrmann <tr...@apache.org> 写道:
>>
>> Hi XiangWei,
>>
>> programmatically there is no nice tooling yet to cancel jobs on a dedicated cluster. What you can do is to use Flink's REST API to issue a cancel command [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`. In the future we will improve the programmatic job control which will allow you to do these kind of things more easily.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw...@gmail.com> wrote:
>> Hi Till,
>>
>> Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can’t find a way to cancel
>> a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code :
>>
>> for (job <- cluster.getCurrentlyRunningJobsJava()) {
>> cluster.stopJob(job)
>> }
>>
>> Is it possible to cancel a running Flink job without shutting down a StandaloneMiniCluster ?
>>
>> Best Regards,
>> XiangWei
>>
>>
>>
>>> 在 2017年9月14日,16:58,Till Rohrmann <tr...@apache.org> 写道:
>>>
>>> Hi XiangWei,
>>>
>>> the problem is that the LocalFlinkMiniCluster can no longer be used in combination with a RemoteExecutionEnvironment. The reason is that the LocalFlinkMiniCluster uses now an internal leader election service and assigns leader ids to its components. Since this is an internal service it is not possible to retrieve this information like it is the case with the ZooKeeper based leader election services.
>>>
>>> Long story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be fixed to work properly together with a local execution environment. Until then, I recommend starting a local standalone cluster and let the code run there.
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw...@gmail.com> wrote:
>>> dear all,
>>>
>>> Below is the code i execute:
>>>
>>> import java.io._
>>> import java.net.{URL, URLClassLoader}
>>> import java.nio.charset.Charset
>>> import java.util.Collections
>>> import java.util.concurrent.atomic.AtomicBoolean
>>>
>>> import com.netease.atom.common.util.logging.Logging
>>> import com.netease.atom.interpreter.Code.Code
>>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
>>> import io.netty.buffer._
>>> import org.apache.flink.api.scala.FlinkILoop
>>> import org.apache.flink.client.CliFrontend
>>> import org.apache.flink.client.cli.CliFrontendParser
>>> import org.apache.flink.client.program.ClusterClient
>>> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
>>> import org.apache.flink.runtime.akka.AkkaUtils
>>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}
>>>
>>> import scala.Console
>>> import scala.beans.BeanProperty
>>> import scala.collection.JavaConversions._
>>> import scala.collection.mutable
>>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>>> import scala.runtime.AbstractFunction0
>>> import scala.tools.nsc.Settings
>>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>>
>>> class FlinkInterpreter extends Interpreter {
>>> private var bufferedReader: Option[BufferedReader] = None
>>> private var jprintWriter: JPrintWriter = _
>>> private val config = new Configuration;
>>> private var cluster: LocalFlinkMiniCluster = _
>>> @BeanProperty var imain: IMain = _
>>> @BeanProperty var flinkILoop: FlinkILoop = _
>>> private var out: ByteBufOutputStream = null
>>> private var outBuf: ByteBuf = null
>>> private var in: ByteBufInputStream = _
>>> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>>
>>> override def isOpen: Boolean = {
>>> isRunning.get()
>>> }
>>>
>>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>>> config.toMap.toMap.foreach(println)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>>> val localCluster = new LocalFlinkMiniCluster(config, false)
>>> localCluster.start(true)
>>> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
>>> println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>>> }
>>>
>>>
>>> /**
>>> * Start flink cluster and create interpreter
>>> */
>>> override def open: Unit = {
>>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>>> out = new ByteBufOutputStream(outBuf)
>>> in = new ByteBufInputStream(outBuf)
>>> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
>>> val (host, port, localCluster) = startLocalMiniCluster()
>>> this.cluster = localCluster
>>> val conf = cluster.configuration
>>> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>>> flinkILoop = new FlinkILoop(host, port, conf, None)
>>> val settings = new Settings()
>>> settings.usejavacp.value = true
>>> settings.Yreplsync.value = true
>>> flinkILoop.settings_$eq(settings)
>>> flinkILoop.createInterpreter()
>>> imain = flinkILoop.intp
>>> FlinkInterpreter.ourClassloader = imain.classLoader
>>> val benv = flinkILoop.scalaBenv
>>> val senv = flinkILoop.scalaSenv
>>> benv.getConfig.disableSysoutLogging()
>>> senv.getConfig.disableSysoutLogging()
>>> // import libraries
>>> imain.interpret("import scala.tools.nsc.io._")
>>> // imain.interpret("import Properties.userHome")
>>> imain.interpret("import scala.compat.Platform.EOL")
>>> imain.interpret("import org.apache.flink.api.scala._")
>>> imain.interpret("import org.apache.flink.api.common.functions._")
>>> isRunning.set(true)
>>> }
>>>
>>> override def interpret(line: String): InterpreterResult = {
>>> if (line == null || line.trim.length == 0) {
>>> return new InterpreterResult(Code.SUCCESS)
>>> }
>>> interpret(line.split("\n"))
>>> }
>>>
>>> /**
>>> * Interprete code
>>> * @param lines
>>> * @return
>>> */
>>> def interpret(lines: Array[String]): InterpreterResult = {
>>> val imain: IMain = getImain
>>> val linesToRun: Array[String] = new Array[String](lines.length + 1)
>>> for (i <- 0 until lines.length) {
>>> linesToRun(i) = lines(i)
>>> }
>>> linesToRun(lines.length) = "print(\"\")"
>>> System.setOut(new PrintStream(out))
>>> out.buffer().clear()
>>> var r: Code = null
>>> var incomplete: String = ""
>>> var inComment: Boolean = false
>>> for (l <- 0 until linesToRun.length) {
>>> val s: String = linesToRun(l)
>>> var continuation: Boolean = false
>>> if (l + 1 < linesToRun.length) {
>>> val nextLine: String = linesToRun(l + 1).trim
>>> if (nextLine.isEmpty ||
>>> nextLine.startsWith("//") ||
>>> nextLine.startsWith("}") ||
>>> nextLine.startsWith("object")) {
>>> continuation = true
>>> } else if (!inComment && nextLine.startsWith("/*")) {
>>> inComment = true
>>> continuation = true
>>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>>> inComment = false
>>> continuation = true
>>> } else if (nextLine.length > 1 &&
>>> nextLine.charAt(0) == '.' &&
>>> nextLine.charAt(1) != '.' &&
>>> nextLine.charAt(1) != '/') {
>>> continuation = true
>>> } else if (inComment) {
>>> continuation = true
>>> }
>>> if (continuation) {
>>> incomplete += s + "\n"
>>> }
>>> }
>>> if (!continuation) {
>>> val currentCommand: String = incomplete
>>> var res: Results.Result = null
>>> try {
>>> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
>>> override def apply() = {
>>> imain.interpret(currentCommand + s)
>>> }
>>> }.apply())
>>> } catch {
>>> case e: Exception =>
>>> logError("Interpreter Exception ", e)
>>> return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
>>> }
>>> r = getResultCode(res)
>>> if (r == Code.ERROR) {
>>> return new InterpreterResult(r, out.toString)
>>> } else if (r eq Code.INCOMPLETE) {
>>> incomplete += s + "\n"
>>> } else {
>>> incomplete = ""
>>> }
>>> }
>>> }
>>>
>>> if (r eq Code.INCOMPLETE) {
>>> return new InterpreterResult(r, "Incomplete expression")
>>> }
>>> else {
>>> return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))
>>> }
>>> }
>>>
>>> private def getResultCode(r: Results.Result): Code = {
>>> if (r.isInstanceOf[Results.Success.type]) {
>>> return Code.SUCCESS
>>> }
>>> else if (r.isInstanceOf[Results.Incomplete.type]) {
>>> return Code.INCOMPLETE
>>> }
>>> else {
>>> return Code.ERROR
>>> }
>>> }
>>>
>>> }
>>> }
>>>
>>> object FlinkInterpreter extends Logging {
>>> var ourClassloader: ClassLoader = _
>>>
>>> def main(args: Array[String]): Unit = {
>>> val interpreter: FlinkInterpreter = new FlinkInterpreter
>>> val code =
>>> """
>>> |val dataStream = senv.fromElements(1,2,3,4,5)
>>> |dataStream.countWindowAll(2).sum(0).print()
>>> |senv.execute("My streaming program")
>>> """.stripMargin
>>> interpreter.open
>>> val result = interpreter.interpret(code)
>>> }
>>> }
>>>
>>> The error messages i got are:
>>> …
>>> …
>>> ...
>>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
>>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor.
>>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940].
>>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon.
>>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports.
>>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
>>> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>>> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
>>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
>>> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
>>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>>> ... 34 elided
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
>>> at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>>> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
>>> ... 41 more
>>> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
>>> at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>>> at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>>
>>>
>>
>>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by XiangWei Huang <xw...@gmail.com>.
Hi Till,
I’ve found that a StandaloneMiniCluster doesn’t startup web fronted when it is running.so,how can i cancel a running job on it with restful method.
Cheers,
Till
> 在 2017年9月20日,15:43,Till Rohrmann <tr...@apache.org> 写道:
>
> Hi XiangWei,
>
> programmatically there is no nice tooling yet to cancel jobs on a dedicated cluster. What you can do is to use Flink's REST API to issue a cancel command [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`. In the future we will improve the programmatic job control which will allow you to do these kind of things more easily.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation>
>
> Cheers,
> Till
>
> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw.huang.hz@gmail.com <ma...@gmail.com>> wrote:
> Hi Till,
>
> Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code :
>
> for (job <- cluster.getCurrentlyRunningJobsJava()) {
> cluster.stopJob(job)
> }
>
> Is it possible to cancel a running Flink job without shutting down a StandaloneMiniCluster ?
>
> Best Regards,
> XiangWei
>
>
>
>> 在 2017年9月14日,16:58,Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> 写道:
>>
>> Hi XiangWei,
>>
>> the problem is that the LocalFlinkMiniCluster can no longer be used in combination with a RemoteExecutionEnvironment. The reason is that the LocalFlinkMiniCluster uses now an internal leader election service and assigns leader ids to its components. Since this is an internal service it is not possible to retrieve this information like it is the case with the ZooKeeper based leader election services.
>>
>> Long story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be fixed to work properly together with a local execution environment. Until then, I recommend starting a local standalone cluster and let the code run there.
>>
>> Cheers,
>> Till
>>
>>
>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang.hz@gmail.com <ma...@gmail.com>> wrote:
>> dear all,
>>
>> Below is the code i execute:
>>
>> import java.io <http://java.io/>._
>> import java.net <http://java.net/>.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.at <http://java.util.concurrent.at/>omic.AtomicBoolean
>>
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.sc <http://org.apache.flink.api.sc/>ala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}
>>
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>
>> class FlinkInterpreter extends Interpreter {
>> private var bufferedReader: Option[BufferedReader] = None
>> private var jprintWriter: JPrintWriter = _
>> private val config = new Configuration;
>> private var cluster: LocalFlinkMiniCluster = _
>> @BeanProperty var imain: IMain = _
>> @BeanProperty var flinkILoop: FlinkILoop = _
>> private var out: ByteBufOutputStream = null
>> private var outBuf: ByteBuf = null
>> private var in: ByteBufInputStream = _
>> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>
>> override def isOpen: Boolean = {
>> isRunning.get()
>> }
>>
>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>> config.toMap.toMap.foreach(println)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>> val localCluster = new LocalFlinkMiniCluster(config, false)
>> localCluster.start(true)
>> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
>> println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>> }
>>
>>
>> /**
>> * Start flink cluster and create interpreter
>> */
>> override def open: Unit = {
>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>> out = new ByteBufOutputStream(outBuf)
>> in = new ByteBufInputStream(outBuf)
>> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
>> val (host, port, localCluster) = startLocalMiniCluster()
>> this.cluster = localCluster
>> val conf = cluster.configuration
>> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>> flinkILoop = new FlinkILoop(host, port, conf, None)
>> val settings = new Settings()
>> settings.usejavacp.value = true
>> settings.Yreplsync.value = true
>> flinkILoop.settings_$eq(settings)
>> flinkILoop.createInterpreter()
>> imain = flinkILoop.intp
>> FlinkInterpreter.ourClassloader = imain.classLoader
>> val benv = flinkILoop.scalaBenv
>> val senv = flinkILoop.scalaSenv
>> benv.getConfig.disableSysoutLogging()
>> senv.getConfig.disableSysoutLogging()
>> // import libraries
>> imain.interpret("import scala.tools.nsc.io <http://scala.tools.nsc.io/>._")
>> // imain.interpret("import Properties.userHome")
>> imain.interpret("import scala.compat.Platform.EOL")
>> imain.interpret("import org.apache.flink.api.scala._")
>> imain.interpret("import org.apache.flink.api.common.functions._")
>> isRunning.set(true)
>> }
>>
>> override def interpret(line: String): InterpreterResult = {
>> if (line == null || line.trim.length == 0) {
>> return new InterpreterResult(Code.SUCCESS)
>> }
>> interpret(line.split("\n"))
>> }
>>
>> /**
>> * Interprete code
>> * @param lines
>> * @return
>> */
>> def interpret(lines: Array[String]): InterpreterResult = {
>> val imain: IMain = getImain
>> val linesToRun: Array[String] = new Array[String](lines.length + 1)
>> for (i <- 0 until lines.length) {
>> linesToRun(i) = lines(i)
>> }
>> linesToRun(lines.length) = "print(\"\")"
>> System.setOut(new PrintStream(out))
>> out.buffer().clear()
>> var r: Code = null
>> var incomplete: String = ""
>> var inComment: Boolean = false
>> for (l <- 0 until linesToRun.length) {
>> val s: String = linesToRun(l)
>> var continuation: Boolean = false
>> if (l + 1 < linesToRun.length) {
>> val nextLine: String = linesToRun(l + 1).trim
>> if (nextLine.isEmpty ||
>> nextLine.startsWith("//") ||
>> nextLine.startsWith("}") ||
>> nextLine.startsWith("object")) {
>> continuation = true
>> } else if (!inComment && nextLine.startsWith("/*")) {
>> inComment = true
>> continuation = true
>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>> inComment = false
>> continuation = true
>> } else if (nextLine.length > 1 &&
>> nextLine.charAt(0) == '.' &&
>> nextLine.charAt(1) != '.' &&
>> nextLine.charAt(1) != '/') {
>> continuation = true
>> } else if (inComment) {
>> continuation = true
>> }
>> if (continuation) {
>> incomplete += s + "\n"
>> }
>> }
>> if (!continuation) {
>> val currentCommand: String = incomplete
>> var res: Results.Result = null
>> try {
>> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
>> override def apply() = {
>> imain.interpret(currentCommand + s)
>> }
>> }.apply())
>> } catch {
>> case e: Exception =>
>> logError("Interpreter Exception ", e)
>> return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
>> }
>> r = getResultCode(res)
>> if (r == Code.ERROR) {
>> return new InterpreterResult(r, out.toString)
>> } else if (r eq Code.INCOMPLETE) {
>> incomplete += s + "\n"
>> } else {
>> incomplete = ""
>> }
>> }
>> }
>>
>> if (r eq Code.INCOMPLETE) {
>> return new InterpreterResult(r, "Incomplete expression")
>> }
>> else {
>> return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))
>> }
>> }
>>
>> private def getResultCode(r: Results.Result): Code = {
>> if (r.isInstanceOf[Results.Success.type]) {
>> return Code.SUCCESS
>> }
>> else if (r.isInstanceOf[Results.Incomplete.type]) {
>> return Code.INCOMPLETE
>> }
>> else {
>> return Code.ERROR
>> }
>> }
>>
>> }
>> }
>>
>> object FlinkInterpreter extends Logging {
>> var ourClassloader: ClassLoader = _
>>
>> def main(args: Array[String]): Unit = {
>> val interpreter: FlinkInterpreter = new FlinkInterpreter
>> val code =
>> """
>> |val dataStream = senv.fromElements(1,2,3,4,5)
>> |dataStream.countWindowAll(2).sum(0).print()
>> |senv.execute("My streaming program")
>> """.stripMargin
>> interpreter.open
>> val result = interpreter.interpret(code)
>> }
>> }
>>
>> The error messages i got are:
>> …
>> …
>> ...
>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 <>].
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
>> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
>> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>> ... 34 elided
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
>> at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
>> ... 41 more
>> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
>> at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>> at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>>
>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by XiangWei Huang <xw...@gmail.com>.
Hi Till,
I’ve found that a StandaloneMiniCluster doesn’t startup web fronted when it is running.so,how can i cancel a running job on it with restful method.
Cheers,
Till
> 在 2017年9月20日,15:43,Till Rohrmann <tr...@apache.org> 写道:
>
> Hi XiangWei,
>
> programmatically there is no nice tooling yet to cancel jobs on a dedicated cluster. What you can do is to use Flink's REST API to issue a cancel command [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`. In the future we will improve the programmatic job control which will allow you to do these kind of things more easily.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation>
>
> Cheers,
> Till
>
> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw.huang.hz@gmail.com <ma...@gmail.com>> wrote:
> Hi Till,
>
> Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code :
>
> for (job <- cluster.getCurrentlyRunningJobsJava()) {
> cluster.stopJob(job)
> }
>
> Is it possible to cancel a running Flink job without shutting down a StandaloneMiniCluster ?
>
> Best Regards,
> XiangWei
>
>
>
>> 在 2017年9月14日,16:58,Till Rohrmann <trohrmann@apache.org <ma...@apache.org>> 写道:
>>
>> Hi XiangWei,
>>
>> the problem is that the LocalFlinkMiniCluster can no longer be used in combination with a RemoteExecutionEnvironment. The reason is that the LocalFlinkMiniCluster uses now an internal leader election service and assigns leader ids to its components. Since this is an internal service it is not possible to retrieve this information like it is the case with the ZooKeeper based leader election services.
>>
>> Long story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be fixed to work properly together with a local execution environment. Until then, I recommend starting a local standalone cluster and let the code run there.
>>
>> Cheers,
>> Till
>>
>>
>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang.hz@gmail.com <ma...@gmail.com>> wrote:
>> dear all,
>>
>> Below is the code i execute:
>>
>> import java.io <http://java.io/>._
>> import java.net <http://java.net/>.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.at <http://java.util.concurrent.at/>omic.AtomicBoolean
>>
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.sc <http://org.apache.flink.api.sc/>ala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}
>>
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>
>> class FlinkInterpreter extends Interpreter {
>> private var bufferedReader: Option[BufferedReader] = None
>> private var jprintWriter: JPrintWriter = _
>> private val config = new Configuration;
>> private var cluster: LocalFlinkMiniCluster = _
>> @BeanProperty var imain: IMain = _
>> @BeanProperty var flinkILoop: FlinkILoop = _
>> private var out: ByteBufOutputStream = null
>> private var outBuf: ByteBuf = null
>> private var in: ByteBufInputStream = _
>> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>
>> override def isOpen: Boolean = {
>> isRunning.get()
>> }
>>
>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>> config.toMap.toMap.foreach(println)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>> val localCluster = new LocalFlinkMiniCluster(config, false)
>> localCluster.start(true)
>> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
>> println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>> }
>>
>>
>> /**
>> * Start flink cluster and create interpreter
>> */
>> override def open: Unit = {
>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>> out = new ByteBufOutputStream(outBuf)
>> in = new ByteBufInputStream(outBuf)
>> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
>> val (host, port, localCluster) = startLocalMiniCluster()
>> this.cluster = localCluster
>> val conf = cluster.configuration
>> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>> flinkILoop = new FlinkILoop(host, port, conf, None)
>> val settings = new Settings()
>> settings.usejavacp.value = true
>> settings.Yreplsync.value = true
>> flinkILoop.settings_$eq(settings)
>> flinkILoop.createInterpreter()
>> imain = flinkILoop.intp
>> FlinkInterpreter.ourClassloader = imain.classLoader
>> val benv = flinkILoop.scalaBenv
>> val senv = flinkILoop.scalaSenv
>> benv.getConfig.disableSysoutLogging()
>> senv.getConfig.disableSysoutLogging()
>> // import libraries
>> imain.interpret("import scala.tools.nsc.io <http://scala.tools.nsc.io/>._")
>> // imain.interpret("import Properties.userHome")
>> imain.interpret("import scala.compat.Platform.EOL")
>> imain.interpret("import org.apache.flink.api.scala._")
>> imain.interpret("import org.apache.flink.api.common.functions._")
>> isRunning.set(true)
>> }
>>
>> override def interpret(line: String): InterpreterResult = {
>> if (line == null || line.trim.length == 0) {
>> return new InterpreterResult(Code.SUCCESS)
>> }
>> interpret(line.split("\n"))
>> }
>>
>> /**
>> * Interprete code
>> * @param lines
>> * @return
>> */
>> def interpret(lines: Array[String]): InterpreterResult = {
>> val imain: IMain = getImain
>> val linesToRun: Array[String] = new Array[String](lines.length + 1)
>> for (i <- 0 until lines.length) {
>> linesToRun(i) = lines(i)
>> }
>> linesToRun(lines.length) = "print(\"\")"
>> System.setOut(new PrintStream(out))
>> out.buffer().clear()
>> var r: Code = null
>> var incomplete: String = ""
>> var inComment: Boolean = false
>> for (l <- 0 until linesToRun.length) {
>> val s: String = linesToRun(l)
>> var continuation: Boolean = false
>> if (l + 1 < linesToRun.length) {
>> val nextLine: String = linesToRun(l + 1).trim
>> if (nextLine.isEmpty ||
>> nextLine.startsWith("//") ||
>> nextLine.startsWith("}") ||
>> nextLine.startsWith("object")) {
>> continuation = true
>> } else if (!inComment && nextLine.startsWith("/*")) {
>> inComment = true
>> continuation = true
>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>> inComment = false
>> continuation = true
>> } else if (nextLine.length > 1 &&
>> nextLine.charAt(0) == '.' &&
>> nextLine.charAt(1) != '.' &&
>> nextLine.charAt(1) != '/') {
>> continuation = true
>> } else if (inComment) {
>> continuation = true
>> }
>> if (continuation) {
>> incomplete += s + "\n"
>> }
>> }
>> if (!continuation) {
>> val currentCommand: String = incomplete
>> var res: Results.Result = null
>> try {
>> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
>> override def apply() = {
>> imain.interpret(currentCommand + s)
>> }
>> }.apply())
>> } catch {
>> case e: Exception =>
>> logError("Interpreter Exception ", e)
>> return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
>> }
>> r = getResultCode(res)
>> if (r == Code.ERROR) {
>> return new InterpreterResult(r, out.toString)
>> } else if (r eq Code.INCOMPLETE) {
>> incomplete += s + "\n"
>> } else {
>> incomplete = ""
>> }
>> }
>> }
>>
>> if (r eq Code.INCOMPLETE) {
>> return new InterpreterResult(r, "Incomplete expression")
>> }
>> else {
>> return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))
>> }
>> }
>>
>> private def getResultCode(r: Results.Result): Code = {
>> if (r.isInstanceOf[Results.Success.type]) {
>> return Code.SUCCESS
>> }
>> else if (r.isInstanceOf[Results.Incomplete.type]) {
>> return Code.INCOMPLETE
>> }
>> else {
>> return Code.ERROR
>> }
>> }
>>
>> }
>> }
>>
>> object FlinkInterpreter extends Logging {
>> var ourClassloader: ClassLoader = _
>>
>> def main(args: Array[String]): Unit = {
>> val interpreter: FlinkInterpreter = new FlinkInterpreter
>> val code =
>> """
>> |val dataStream = senv.fromElements(1,2,3,4,5)
>> |dataStream.countWindowAll(2).sum(0).print()
>> |senv.execute("My streaming program")
>> """.stripMargin
>> interpreter.open
>> val result = interpreter.interpret(code)
>> }
>> }
>>
>> The error messages i got are:
>> …
>> …
>> ...
>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 <>].
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
>> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
>> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>> ... 34 elided
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
>> at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
>> ... 41 more
>> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
>> at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>> at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>>
>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by Till Rohrmann <tr...@apache.org>.
Hi XiangWei,
programmatically there is no nice tooling yet to cancel jobs on a dedicated
cluster. What you can do is to use Flink's REST API to issue a cancel
command [1]. You have to send a GET request to the target URL
`/jobs/:jobid/cancel`. In the future we will improve the programmatic job
control which will allow you to do these kind of things more easily.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
Cheers,
Till
On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw...@gmail.com>
wrote:
> Hi Till,
>
> Thanks for your answer,it worked when i use *StandaloneMiniCluster,*
> but another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for
> LocalFlinkMiniCluster i can do it with below code :
>
> * for (job <- cluster.getCurrentlyRunningJobsJava()) {*
>
> * cluster.stopJob(job) }*
>
> Is it possible to cancel a running Flink job without shutting down a *StandaloneMiniCluster
> ?*
>
> Best Regards,
> XiangWei
>
>
>
> 在 2017年9月14日,16:58,Till Rohrmann <tr...@apache.org> 写道:
>
> Hi XiangWei,
>
> the problem is that the LocalFlinkMiniCluster can no longer be used in
> combination with a RemoteExecutionEnvironment. The reason is that the
> LocalFlinkMiniCluster uses now an internal leader election service and
> assigns leader ids to its components. Since this is an internal service it
> is not possible to retrieve this information like it is the case with the
> ZooKeeper based leader election services.
>
> Long story short, the Flink Scala shell currently does not work with a
> LocalFlinkMiniCluster and would have to be fixed to work properly
> together with a local execution environment. Until then, I recommend
> starting a local standalone cluster and let the code run there.
>
> Cheers,
> Till
>
>
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw...@gmail.com>
> wrote:
>
>> dear all,
>>
>> *Below is the code i execute:*
>>
>> import java.io._
>> import java.net.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.atomic.AtomicBoolean
>>
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, Inte
>> rpreterResult, InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.scala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions
>> , Configuration, ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniC
>> luster, LocalFlinkMiniCluster}
>>
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>
>> class FlinkInterpreter extends Interpreter {
>> private var bufferedReader: Option[BufferedReader] = None
>> private var jprintWriter: JPrintWriter = _
>> private val config = new Configuration;
>> private var cluster: LocalFlinkMiniCluster = _
>> @BeanProperty var imain: IMain = _
>> @BeanProperty var flinkILoop: FlinkILoop = _
>> private var out: ByteBufOutputStream = null
>> private var outBuf: ByteBuf = null
>> private var in: ByteBufInputStream = _
>> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>
>> override def isOpen: Boolean = {
>> isRunning.get()
>> }
>>
>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>> config.toMap.toMap.foreach(println)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>> val localCluster = new LocalFlinkMiniCluster(config, false)
>> localCluster.start(true)
>> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.
>> get.head).port
>> println(s"Starting local Flink cluster (host:
>> localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>> }
>>
>>
>> /**
>> * Start flink cluster and create interpreter
>> */
>> override def open: Unit = {
>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>> out = new ByteBufOutputStream(outBuf)
>> in = new ByteBufInputStream(outBuf)
>> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1),
>> None, None, None, Option(1), None))
>> val (host, port, localCluster) = startLocalMiniCluster()
>> this.cluster = localCluster
>> val conf = cluster.configuration
>> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>> flinkILoop = new FlinkILoop(host, port, conf, None)
>> val settings = new Settings()
>> settings.usejavacp.value = true
>> settings.Yreplsync.value = true
>> flinkILoop.settings_$eq(settings)
>> flinkILoop.createInterpreter()
>> imain = flinkILoop.intp
>> FlinkInterpreter.ourClassloader = imain.classLoader
>> val benv = flinkILoop.scalaBenv
>> val senv = flinkILoop.scalaSenv
>> benv.getConfig.disableSysoutLogging()
>> senv.getConfig.disableSysoutLogging()
>> // import libraries
>> imain.interpret("import scala.tools.nsc.io._")
>> // imain.interpret("import Properties.userHome")
>> imain.interpret("import scala.compat.Platform.EOL")
>> imain.interpret("import org.apache.flink.api.scala._")
>> imain.interpret("import org.apache.flink.api.common.functions._")
>> isRunning.set(true)
>> }
>>
>> override def interpret(line: String): InterpreterResult = {
>> if (line == null || line.trim.length == 0) {
>> return new InterpreterResult(Code.SUCCESS)
>> }
>> interpret(line.split("\n"))
>> }
>>
>> /**
>> * Interprete code
>> * @param lines
>> * @return
>> */
>> def interpret(lines: Array[String]): InterpreterResult = {
>> val imain: IMain = getImain
>> val linesToRun: Array[String] = new Array[String](lines.length + 1)
>> for (i <- 0 until lines.length) {
>> linesToRun(i) = lines(i)
>> }
>> linesToRun(lines.length) = "print(\"\")"
>> System.setOut(new PrintStream(out))
>> out.buffer().clear()
>> var r: Code = null
>> var incomplete: String = ""
>> var inComment: Boolean = false
>> for (l <- 0 until linesToRun.length) {
>> val s: String = linesToRun(l)
>> var continuation: Boolean = false
>> if (l + 1 < linesToRun.length) {
>> val nextLine: String = linesToRun(l + 1).trim
>> if (nextLine.isEmpty ||
>> nextLine.startsWith("//") ||
>> nextLine.startsWith("}") ||
>> nextLine.startsWith("object")) {
>> continuation = true
>> } else if (!inComment && nextLine.startsWith("/*")) {
>> inComment = true
>> continuation = true
>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>> inComment = false
>> continuation = true
>> } else if (nextLine.length > 1 &&
>> nextLine.charAt(0) == '.' &&
>> nextLine.charAt(1) != '.' &&
>> nextLine.charAt(1) != '/') {
>> continuation = true
>> } else if (inComment) {
>> continuation = true
>> }
>> if (continuation) {
>> incomplete += s + "\n"
>> }
>> }
>> if (!continuation) {
>> val currentCommand: String = incomplete
>> var res: Results.Result = null
>> try {
>> res = Console.withOut(System.out)(ne
>> w AbstractFunction0[Results.Result] {
>> override def apply() = {
>> imain.interpret(currentCommand + s)
>> }
>> }.apply())
>> } catch {
>> case e: Exception =>
>> logError("Interpreter Exception ", e)
>> return new InterpreterResult(Code.ERR
>> OR, InterpreterUtils.getMostRelevantMessage(e))
>> }
>> r = getResultCode(res)
>> if (r == Code.ERROR) {
>> return new InterpreterResult(r, out.toString)
>> } else if (r eq Code.INCOMPLETE) {
>> incomplete += s + "\n"
>> } else {
>> incomplete = ""
>> }
>> }
>> }
>>
>> if (r eq Code.INCOMPLETE) {
>> return new InterpreterResult(r, "Incomplete expression")
>> }
>> else {
>> return new InterpreterResult(r, out.buffer().toString(Charset.forNa
>> me("utf-8")))
>> }
>> }
>>
>> private def getResultCode(r: Results.Result): Code = {
>> if (r.isInstanceOf[Results.Success.type]) {
>> return Code.SUCCESS
>> }
>> else if (r.isInstanceOf[Results.Incomplete.type]) {
>> return Code.INCOMPLETE
>> }
>> else {
>> return Code.ERROR
>> }
>> }
>>
>> }
>> }
>>
>> object FlinkInterpreter extends Logging {
>> var ourClassloader: ClassLoader = _
>>
>> def main(args: Array[String]): Unit = {
>> val interpreter: FlinkInterpreter = new FlinkInterpreter
>> val code =
>> """
>> |val dataStream = senv.fromElements(1,2,3,4,5)
>> |dataStream.countWindowAll(2).sum(0).print()
>> |senv.execute("My streaming program")
>> """.stripMargin
>> interpreter.open
>> val result = interpreter.interpret(code)
>> }
>> }
>>
>> *The error messages i got are:*
>> …
>> …
>> ...
>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
>> Discard message LeaderSessionMessage(00000000-
>> 0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
>> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES))
>> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7
>> did not equal the received leader session ID 00000000-0000-0000-0000-000000
>> 000000.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
>> Terminate JobClientActor.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
>> Disconnect from JobManager Actor[akka.tcp://flink@localho
>> st:63522/user/jobmanager#82627940].
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Shutting down remote daemon.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Remote daemon shut down; proceeding with flushing remote transports.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Remoting shut down.
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Couldn't retrieve the JobExecutionResult from the
>> JobManager.
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:478)
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:105)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:442)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:434)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvir
>> onment.executeRemotely(RemoteStreamEnvironment.java:212)
>> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.
>> executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvir
>> onment.execute(RemoteStreamEnvironment.java:176)
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:638)
>> ... 34 elided
>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> Couldn't retrieve the JobExecutionResult from the JobManager.
>> at org.apache.flink.runtime.client.JobClient.awaitJobResult(
>> JobClient.java:309)
>> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(
>> JobClient.java:396)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:467)
>> ... 41 more
>> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> Job submission to the JobManager timed out. You may increase
>> 'akka.client.timeout' in case the JobManager needs more time to configure
>> and confirm the job submission.
>> at org.apache.flink.runtime.client.JobSubmissionClientActor.han
>> dleCustomMessage(JobSubmissionClientActor.java:119)
>> at org.apache.flink.runtime.client.JobClientActor.handleMessage
>> (JobClientActor.java:251)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeader
>> SessionID(FlinkUntypedActor.java:89)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(Fl
>> inkUntypedActor.java:68)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp
>> edActor.scala:167)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>> exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>>
>>
>>
>>
>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by Till Rohrmann <tr...@apache.org>.
Hi XiangWei,
programmatically there is no nice tooling yet to cancel jobs on a dedicated
cluster. What you can do is to use Flink's REST API to issue a cancel
command [1]. You have to send a GET request to the target URL
`/jobs/:jobid/cancel`. In the future we will improve the programmatic job
control which will allow you to do these kind of things more easily.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
Cheers,
Till
On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw...@gmail.com>
wrote:
> Hi Till,
>
> Thanks for your answer,it worked when i use *StandaloneMiniCluster,*
> but another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for
> LocalFlinkMiniCluster i can do it with below code :
>
> * for (job <- cluster.getCurrentlyRunningJobsJava()) {*
>
> * cluster.stopJob(job) }*
>
> Is it possible to cancel a running Flink job without shutting down a *StandaloneMiniCluster
> ?*
>
> Best Regards,
> XiangWei
>
>
>
> 在 2017年9月14日,16:58,Till Rohrmann <tr...@apache.org> 写道:
>
> Hi XiangWei,
>
> the problem is that the LocalFlinkMiniCluster can no longer be used in
> combination with a RemoteExecutionEnvironment. The reason is that the
> LocalFlinkMiniCluster uses now an internal leader election service and
> assigns leader ids to its components. Since this is an internal service it
> is not possible to retrieve this information like it is the case with the
> ZooKeeper based leader election services.
>
> Long story short, the Flink Scala shell currently does not work with a
> LocalFlinkMiniCluster and would have to be fixed to work properly
> together with a local execution environment. Until then, I recommend
> starting a local standalone cluster and let the code run there.
>
> Cheers,
> Till
>
>
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw...@gmail.com>
> wrote:
>
>> dear all,
>>
>> *Below is the code i execute:*
>>
>> import java.io._
>> import java.net.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.atomic.AtomicBoolean
>>
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, Inte
>> rpreterResult, InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.scala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions
>> , Configuration, ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniC
>> luster, LocalFlinkMiniCluster}
>>
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>
>> class FlinkInterpreter extends Interpreter {
>> private var bufferedReader: Option[BufferedReader] = None
>> private var jprintWriter: JPrintWriter = _
>> private val config = new Configuration;
>> private var cluster: LocalFlinkMiniCluster = _
>> @BeanProperty var imain: IMain = _
>> @BeanProperty var flinkILoop: FlinkILoop = _
>> private var out: ByteBufOutputStream = null
>> private var outBuf: ByteBuf = null
>> private var in: ByteBufInputStream = _
>> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>
>> override def isOpen: Boolean = {
>> isRunning.get()
>> }
>>
>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>> config.toMap.toMap.foreach(println)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>> val localCluster = new LocalFlinkMiniCluster(config, false)
>> localCluster.start(true)
>> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.
>> get.head).port
>> println(s"Starting local Flink cluster (host:
>> localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>> }
>>
>>
>> /**
>> * Start flink cluster and create interpreter
>> */
>> override def open: Unit = {
>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>> out = new ByteBufOutputStream(outBuf)
>> in = new ByteBufInputStream(outBuf)
>> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1),
>> None, None, None, Option(1), None))
>> val (host, port, localCluster) = startLocalMiniCluster()
>> this.cluster = localCluster
>> val conf = cluster.configuration
>> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>> flinkILoop = new FlinkILoop(host, port, conf, None)
>> val settings = new Settings()
>> settings.usejavacp.value = true
>> settings.Yreplsync.value = true
>> flinkILoop.settings_$eq(settings)
>> flinkILoop.createInterpreter()
>> imain = flinkILoop.intp
>> FlinkInterpreter.ourClassloader = imain.classLoader
>> val benv = flinkILoop.scalaBenv
>> val senv = flinkILoop.scalaSenv
>> benv.getConfig.disableSysoutLogging()
>> senv.getConfig.disableSysoutLogging()
>> // import libraries
>> imain.interpret("import scala.tools.nsc.io._")
>> // imain.interpret("import Properties.userHome")
>> imain.interpret("import scala.compat.Platform.EOL")
>> imain.interpret("import org.apache.flink.api.scala._")
>> imain.interpret("import org.apache.flink.api.common.functions._")
>> isRunning.set(true)
>> }
>>
>> override def interpret(line: String): InterpreterResult = {
>> if (line == null || line.trim.length == 0) {
>> return new InterpreterResult(Code.SUCCESS)
>> }
>> interpret(line.split("\n"))
>> }
>>
>> /**
>> * Interprete code
>> * @param lines
>> * @return
>> */
>> def interpret(lines: Array[String]): InterpreterResult = {
>> val imain: IMain = getImain
>> val linesToRun: Array[String] = new Array[String](lines.length + 1)
>> for (i <- 0 until lines.length) {
>> linesToRun(i) = lines(i)
>> }
>> linesToRun(lines.length) = "print(\"\")"
>> System.setOut(new PrintStream(out))
>> out.buffer().clear()
>> var r: Code = null
>> var incomplete: String = ""
>> var inComment: Boolean = false
>> for (l <- 0 until linesToRun.length) {
>> val s: String = linesToRun(l)
>> var continuation: Boolean = false
>> if (l + 1 < linesToRun.length) {
>> val nextLine: String = linesToRun(l + 1).trim
>> if (nextLine.isEmpty ||
>> nextLine.startsWith("//") ||
>> nextLine.startsWith("}") ||
>> nextLine.startsWith("object")) {
>> continuation = true
>> } else if (!inComment && nextLine.startsWith("/*")) {
>> inComment = true
>> continuation = true
>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>> inComment = false
>> continuation = true
>> } else if (nextLine.length > 1 &&
>> nextLine.charAt(0) == '.' &&
>> nextLine.charAt(1) != '.' &&
>> nextLine.charAt(1) != '/') {
>> continuation = true
>> } else if (inComment) {
>> continuation = true
>> }
>> if (continuation) {
>> incomplete += s + "\n"
>> }
>> }
>> if (!continuation) {
>> val currentCommand: String = incomplete
>> var res: Results.Result = null
>> try {
>> res = Console.withOut(System.out)(ne
>> w AbstractFunction0[Results.Result] {
>> override def apply() = {
>> imain.interpret(currentCommand + s)
>> }
>> }.apply())
>> } catch {
>> case e: Exception =>
>> logError("Interpreter Exception ", e)
>> return new InterpreterResult(Code.ERR
>> OR, InterpreterUtils.getMostRelevantMessage(e))
>> }
>> r = getResultCode(res)
>> if (r == Code.ERROR) {
>> return new InterpreterResult(r, out.toString)
>> } else if (r eq Code.INCOMPLETE) {
>> incomplete += s + "\n"
>> } else {
>> incomplete = ""
>> }
>> }
>> }
>>
>> if (r eq Code.INCOMPLETE) {
>> return new InterpreterResult(r, "Incomplete expression")
>> }
>> else {
>> return new InterpreterResult(r, out.buffer().toString(Charset.forNa
>> me("utf-8")))
>> }
>> }
>>
>> private def getResultCode(r: Results.Result): Code = {
>> if (r.isInstanceOf[Results.Success.type]) {
>> return Code.SUCCESS
>> }
>> else if (r.isInstanceOf[Results.Incomplete.type]) {
>> return Code.INCOMPLETE
>> }
>> else {
>> return Code.ERROR
>> }
>> }
>>
>> }
>> }
>>
>> object FlinkInterpreter extends Logging {
>> var ourClassloader: ClassLoader = _
>>
>> def main(args: Array[String]): Unit = {
>> val interpreter: FlinkInterpreter = new FlinkInterpreter
>> val code =
>> """
>> |val dataStream = senv.fromElements(1,2,3,4,5)
>> |dataStream.countWindowAll(2).sum(0).print()
>> |senv.execute("My streaming program")
>> """.stripMargin
>> interpreter.open
>> val result = interpreter.interpret(code)
>> }
>> }
>>
>> *The error messages i got are:*
>> …
>> …
>> ...
>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
>> Discard message LeaderSessionMessage(00000000-
>> 0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
>> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES))
>> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7
>> did not equal the received leader session ID 00000000-0000-0000-0000-000000
>> 000000.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
>> Terminate JobClientActor.
>> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
>> Disconnect from JobManager Actor[akka.tcp://flink@localho
>> st:63522/user/jobmanager#82627940].
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Shutting down remote daemon.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Remote daemon shut down; proceeding with flushing remote transports.
>> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Remoting shut down.
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Couldn't retrieve the JobExecutionResult from the
>> JobManager.
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:478)
>> at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:105)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:442)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:434)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvir
>> onment.executeRemotely(RemoteStreamEnvironment.java:212)
>> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.
>> executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>> at org.apache.flink.streaming.api.environment.RemoteStreamEnvir
>> onment.execute(RemoteStreamEnvironment.java:176)
>> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:638)
>> ... 34 elided
>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> Couldn't retrieve the JobExecutionResult from the JobManager.
>> at org.apache.flink.runtime.client.JobClient.awaitJobResult(
>> JobClient.java:309)
>> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(
>> JobClient.java:396)
>> at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:467)
>> ... 41 more
>> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> Job submission to the JobManager timed out. You may increase
>> 'akka.client.timeout' in case the JobManager needs more time to configure
>> and confirm the job submission.
>> at org.apache.flink.runtime.client.JobSubmissionClientActor.han
>> dleCustomMessage(JobSubmissionClientActor.java:119)
>> at org.apache.flink.runtime.client.JobClientActor.handleMessage
>> (JobClientActor.java:251)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeader
>> SessionID(FlinkUntypedActor.java:89)
>> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(Fl
>> inkUntypedActor.java:68)
>> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp
>> edActor.scala:167)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>> exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>>
>>
>>
>>
>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by XiangWei Huang <xw...@gmail.com>.
Hi Till,
Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can’t find a way to cancel
a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code :
for (job <- cluster.getCurrentlyRunningJobsJava()) {
cluster.stopJob(job)
}
Is it possible to cancel a running Flink job without shutting down a StandaloneMiniCluster ?
Best Regards,
XiangWei
> 在 2017年9月14日,16:58,Till Rohrmann <tr...@apache.org> 写道:
>
> Hi XiangWei,
>
> the problem is that the LocalFlinkMiniCluster can no longer be used in combination with a RemoteExecutionEnvironment. The reason is that the LocalFlinkMiniCluster uses now an internal leader election service and assigns leader ids to its components. Since this is an internal service it is not possible to retrieve this information like it is the case with the ZooKeeper based leader election services.
>
> Long story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be fixed to work properly together with a local execution environment. Until then, I recommend starting a local standalone cluster and let the code run there.
>
> Cheers,
> Till
>
>
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang.hz@gmail.com <ma...@gmail.com>> wrote:
> dear all,
>
> Below is the code i execute:
>
> import java.io <http://java.io/>._
> import java.net <http://java.net/>.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
>
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}
>
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>
> class FlinkInterpreter extends Interpreter {
> private var bufferedReader: Option[BufferedReader] = None
> private var jprintWriter: JPrintWriter = _
> private val config = new Configuration;
> private var cluster: LocalFlinkMiniCluster = _
> @BeanProperty var imain: IMain = _
> @BeanProperty var flinkILoop: FlinkILoop = _
> private var out: ByteBufOutputStream = null
> private var outBuf: ByteBuf = null
> private var in: ByteBufInputStream = _
> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>
> override def isOpen: Boolean = {
> isRunning.get()
> }
>
> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
> }
>
>
> /**
> * Start flink cluster and create interpreter
> */
> override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val conf = cluster.configuration
> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
> flinkILoop = new FlinkILoop(host, port, conf, None)
> val settings = new Settings()
> settings.usejavacp.value = true
> settings.Yreplsync.value = true
> flinkILoop.settings_$eq(settings)
> flinkILoop.createInterpreter()
> imain = flinkILoop.intp
> FlinkInterpreter.ourClassloader = imain.classLoader
> val benv = flinkILoop.scalaBenv
> val senv = flinkILoop.scalaSenv
> benv.getConfig.disableSysoutLogging()
> senv.getConfig.disableSysoutLogging()
> // import libraries
> imain.interpret("import scala.tools.nsc.io <http://scala.tools.nsc.io/>._")
> // imain.interpret("import Properties.userHome")
> imain.interpret("import scala.compat.Platform.EOL")
> imain.interpret("import org.apache.flink.api.scala._")
> imain.interpret("import org.apache.flink.api.common.functions._")
> isRunning.set(true)
> }
>
> override def interpret(line: String): InterpreterResult = {
> if (line == null || line.trim.length == 0) {
> return new InterpreterResult(Code.SUCCESS)
> }
> interpret(line.split("\n"))
> }
>
> /**
> * Interprete code
> * @param lines
> * @return
> */
> def interpret(lines: Array[String]): InterpreterResult = {
> val imain: IMain = getImain
> val linesToRun: Array[String] = new Array[String](lines.length + 1)
> for (i <- 0 until lines.length) {
> linesToRun(i) = lines(i)
> }
> linesToRun(lines.length) = "print(\"\")"
> System.setOut(new PrintStream(out))
> out.buffer().clear()
> var r: Code = null
> var incomplete: String = ""
> var inComment: Boolean = false
> for (l <- 0 until linesToRun.length) {
> val s: String = linesToRun(l)
> var continuation: Boolean = false
> if (l + 1 < linesToRun.length) {
> val nextLine: String = linesToRun(l + 1).trim
> if (nextLine.isEmpty ||
> nextLine.startsWith("//") ||
> nextLine.startsWith("}") ||
> nextLine.startsWith("object")) {
> continuation = true
> } else if (!inComment && nextLine.startsWith("/*")) {
> inComment = true
> continuation = true
> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
> inComment = false
> continuation = true
> } else if (nextLine.length > 1 &&
> nextLine.charAt(0) == '.' &&
> nextLine.charAt(1) != '.' &&
> nextLine.charAt(1) != '/') {
> continuation = true
> } else if (inComment) {
> continuation = true
> }
> if (continuation) {
> incomplete += s + "\n"
> }
> }
> if (!continuation) {
> val currentCommand: String = incomplete
> var res: Results.Result = null
> try {
> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
> override def apply() = {
> imain.interpret(currentCommand + s)
> }
> }.apply())
> } catch {
> case e: Exception =>
> logError("Interpreter Exception ", e)
> return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
> }
> r = getResultCode(res)
> if (r == Code.ERROR) {
> return new InterpreterResult(r, out.toString)
> } else if (r eq Code.INCOMPLETE) {
> incomplete += s + "\n"
> } else {
> incomplete = ""
> }
> }
> }
>
> if (r eq Code.INCOMPLETE) {
> return new InterpreterResult(r, "Incomplete expression")
> }
> else {
> return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))
> }
> }
>
> private def getResultCode(r: Results.Result): Code = {
> if (r.isInstanceOf[Results.Success.type]) {
> return Code.SUCCESS
> }
> else if (r.isInstanceOf[Results.Incomplete.type]) {
> return Code.INCOMPLETE
> }
> else {
> return Code.ERROR
> }
> }
>
> }
> }
>
> object FlinkInterpreter extends Logging {
> var ourClassloader: ClassLoader = _
>
> def main(args: Array[String]): Unit = {
> val interpreter: FlinkInterpreter = new FlinkInterpreter
> val code =
> """
> |val dataStream = senv.fromElements(1,2,3,4,5)
> |dataStream.countWindowAll(2).sum(0).print()
> |senv.execute("My streaming program")
> """.stripMargin
> interpreter.open
> val result = interpreter.interpret(code)
> }
> }
>
> The error messages i got are:
> …
> …
> ...
> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 <>].
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> ... 34 elided
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
> at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 41 more
> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
> at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by XiangWei Huang <xw...@gmail.com>.
Hi Till,
Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can��t find a way to cancel
a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code :
for (job <- cluster.getCurrentlyRunningJobsJava()) {
cluster.stopJob(job)
}
Is it possible to cancel a running Flink job without shutting down a StandaloneMiniCluster ?
Best Regards,
XiangWei
> �� 2017��9��14�գ�16:58��Till Rohrmann <tr...@apache.org> д����
>
> Hi XiangWei,
>
> the problem is that the LocalFlinkMiniCluster can no longer be used in combination with a RemoteExecutionEnvironment. The reason is that the LocalFlinkMiniCluster uses now an internal leader election service and assigns leader ids to its components. Since this is an internal service it is not possible to retrieve this information like it is the case with the ZooKeeper based leader election services.
>
> Long story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be fixed to work properly together with a local execution environment. Until then, I recommend starting a local standalone cluster and let the code run there.
>
> Cheers,
> Till
>
>
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang.hz@gmail.com <ma...@gmail.com>> wrote:
> dear all,
>
> Below is the code i execute:
>
> import java.io <http://java.io/>._
> import java.net <http://java.net/>.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
>
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster}
>
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>
> class FlinkInterpreter extends Interpreter {
> private var bufferedReader: Option[BufferedReader] = None
> private var jprintWriter: JPrintWriter = _
> private val config = new Configuration;
> private var cluster: LocalFlinkMiniCluster = _
> @BeanProperty var imain: IMain = _
> @BeanProperty var flinkILoop: FlinkILoop = _
> private var out: ByteBufOutputStream = null
> private var outBuf: ByteBuf = null
> private var in: ByteBufInputStream = _
> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>
> override def isOpen: Boolean = {
> isRunning.get()
> }
>
> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
> }
>
>
> /**
> * Start flink cluster and create interpreter
> */
> override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val conf = cluster.configuration
> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
> flinkILoop = new FlinkILoop(host, port, conf, None)
> val settings = new Settings()
> settings.usejavacp.value = true
> settings.Yreplsync.value = true
> flinkILoop.settings_$eq(settings)
> flinkILoop.createInterpreter()
> imain = flinkILoop.intp
> FlinkInterpreter.ourClassloader = imain.classLoader
> val benv = flinkILoop.scalaBenv
> val senv = flinkILoop.scalaSenv
> benv.getConfig.disableSysoutLogging()
> senv.getConfig.disableSysoutLogging()
> // import libraries
> imain.interpret("import scala.tools.nsc.io <http://scala.tools.nsc.io/>._")
> // imain.interpret("import Properties.userHome")
> imain.interpret("import scala.compat.Platform.EOL")
> imain.interpret("import org.apache.flink.api.scala._")
> imain.interpret("import org.apache.flink.api.common.functions._")
> isRunning.set(true)
> }
>
> override def interpret(line: String): InterpreterResult = {
> if (line == null || line.trim.length == 0) {
> return new InterpreterResult(Code.SUCCESS)
> }
> interpret(line.split("\n"))
> }
>
> /**
> * Interprete code
> * @param lines
> * @return
> */
> def interpret(lines: Array[String]): InterpreterResult = {
> val imain: IMain = getImain
> val linesToRun: Array[String] = new Array[String](lines.length + 1)
> for (i <- 0 until lines.length) {
> linesToRun(i) = lines(i)
> }
> linesToRun(lines.length) = "print(\"\")"
> System.setOut(new PrintStream(out))
> out.buffer().clear()
> var r: Code = null
> var incomplete: String = ""
> var inComment: Boolean = false
> for (l <- 0 until linesToRun.length) {
> val s: String = linesToRun(l)
> var continuation: Boolean = false
> if (l + 1 < linesToRun.length) {
> val nextLine: String = linesToRun(l + 1).trim
> if (nextLine.isEmpty ||
> nextLine.startsWith("//") ||
> nextLine.startsWith("}") ||
> nextLine.startsWith("object")) {
> continuation = true
> } else if (!inComment && nextLine.startsWith("/*")) {
> inComment = true
> continuation = true
> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
> inComment = false
> continuation = true
> } else if (nextLine.length > 1 &&
> nextLine.charAt(0) == '.' &&
> nextLine.charAt(1) != '.' &&
> nextLine.charAt(1) != '/') {
> continuation = true
> } else if (inComment) {
> continuation = true
> }
> if (continuation) {
> incomplete += s + "\n"
> }
> }
> if (!continuation) {
> val currentCommand: String = incomplete
> var res: Results.Result = null
> try {
> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] {
> override def apply() = {
> imain.interpret(currentCommand + s)
> }
> }.apply())
> } catch {
> case e: Exception =>
> logError("Interpreter Exception ", e)
> return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e))
> }
> r = getResultCode(res)
> if (r == Code.ERROR) {
> return new InterpreterResult(r, out.toString)
> } else if (r eq Code.INCOMPLETE) {
> incomplete += s + "\n"
> } else {
> incomplete = ""
> }
> }
> }
>
> if (r eq Code.INCOMPLETE) {
> return new InterpreterResult(r, "Incomplete expression")
> }
> else {
> return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8")))
> }
> }
>
> private def getResultCode(r: Results.Result): Code = {
> if (r.isInstanceOf[Results.Success.type]) {
> return Code.SUCCESS
> }
> else if (r.isInstanceOf[Results.Incomplete.type]) {
> return Code.INCOMPLETE
> }
> else {
> return Code.ERROR
> }
> }
>
> }
> }
>
> object FlinkInterpreter extends Logging {
> var ourClassloader: ClassLoader = _
>
> def main(args: Array[String]): Unit = {
> val interpreter: FlinkInterpreter = new FlinkInterpreter
> val code =
> """
> |val dataStream = senv.fromElements(1,2,3,4,5)
> |dataStream.countWindowAll(2).sum(0).print()
> |senv.execute("My streaming program")
> """.stripMargin
> interpreter.open
> val result = interpreter.interpret(code)
> }
> }
>
> The error messages i got are:
> ��
> ��
> ...
> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 <>].
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> ... 34 elided
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
> at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 41 more
> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission.
> at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by Till Rohrmann <tr...@apache.org>.
Hi XiangWei,
the problem is that the LocalFlinkMiniCluster can no longer be used in
combination with a RemoteExecutionEnvironment. The reason is that the
LocalFlinkMiniCluster uses now an internal leader election service and
assigns leader ids to its components. Since this is an internal service it
is not possible to retrieve this information like it is the case with the
ZooKeeper based leader election services.
Long story short, the Flink Scala shell currently does not work with a
LocalFlinkMiniCluster and would have to be fixed to work properly together
with a local execution environment. Until then, I recommend starting a
local standalone cluster and let the code run there.
Cheers,
Till
On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw...@gmail.com>
wrote:
> dear all,
>
> *Below is the code i execute:*
>
> import java.io._
> import java.net.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
>
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter,
> InterpreterResult, InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions,
> Configuration, ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster,
> LocalFlinkMiniCluster}
>
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>
> class FlinkInterpreter extends Interpreter {
> private var bufferedReader: Option[BufferedReader] = None
> private var jprintWriter: JPrintWriter = _
> private val config = new Configuration;
> private var cluster: LocalFlinkMiniCluster = _
> @BeanProperty var imain: IMain = _
> @BeanProperty var flinkILoop: FlinkILoop = _
> private var out: ByteBufOutputStream = null
> private var outBuf: ByteBuf = null
> private var in: ByteBufInputStream = _
> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>
> override def isOpen: Boolean = {
> isRunning.get()
> }
>
> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port = AkkaUtils.getAddress(localCluster.
> jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port: ${
> localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
> }
>
>
> /**
> * Start flink cluster and create interpreter
> */
> override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1),
> None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val conf = cluster.configuration
> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
> flinkILoop = new FlinkILoop(host, port, conf, None)
> val settings = new Settings()
> settings.usejavacp.value = true
> settings.Yreplsync.value = true
> flinkILoop.settings_$eq(settings)
> flinkILoop.createInterpreter()
> imain = flinkILoop.intp
> FlinkInterpreter.ourClassloader = imain.classLoader
> val benv = flinkILoop.scalaBenv
> val senv = flinkILoop.scalaSenv
> benv.getConfig.disableSysoutLogging()
> senv.getConfig.disableSysoutLogging()
> // import libraries
> imain.interpret("import scala.tools.nsc.io._")
> // imain.interpret("import Properties.userHome")
> imain.interpret("import scala.compat.Platform.EOL")
> imain.interpret("import org.apache.flink.api.scala._")
> imain.interpret("import org.apache.flink.api.common.functions._")
> isRunning.set(true)
> }
>
> override def interpret(line: String): InterpreterResult = {
> if (line == null || line.trim.length == 0) {
> return new InterpreterResult(Code.SUCCESS)
> }
> interpret(line.split("\n"))
> }
>
> /**
> * Interprete code
> * @param lines
> * @return
> */
> def interpret(lines: Array[String]): InterpreterResult = {
> val imain: IMain = getImain
> val linesToRun: Array[String] = new Array[String](lines.length + 1)
> for (i <- 0 until lines.length) {
> linesToRun(i) = lines(i)
> }
> linesToRun(lines.length) = "print(\"\")"
> System.setOut(new PrintStream(out))
> out.buffer().clear()
> var r: Code = null
> var incomplete: String = ""
> var inComment: Boolean = false
> for (l <- 0 until linesToRun.length) {
> val s: String = linesToRun(l)
> var continuation: Boolean = false
> if (l + 1 < linesToRun.length) {
> val nextLine: String = linesToRun(l + 1).trim
> if (nextLine.isEmpty ||
> nextLine.startsWith("//") ||
> nextLine.startsWith("}") ||
> nextLine.startsWith("object")) {
> continuation = true
> } else if (!inComment && nextLine.startsWith("/*")) {
> inComment = true
> continuation = true
> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
> inComment = false
> continuation = true
> } else if (nextLine.length > 1 &&
> nextLine.charAt(0) == '.' &&
> nextLine.charAt(1) != '.' &&
> nextLine.charAt(1) != '/') {
> continuation = true
> } else if (inComment) {
> continuation = true
> }
> if (continuation) {
> incomplete += s + "\n"
> }
> }
> if (!continuation) {
> val currentCommand: String = incomplete
> var res: Results.Result = null
> try {
> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result]
> {
> override def apply() = {
> imain.interpret(currentCommand + s)
> }
> }.apply())
> } catch {
> case e: Exception =>
> logError("Interpreter Exception ", e)
> return new InterpreterResult(Code.ERROR, InterpreterUtils.
> getMostRelevantMessage(e))
> }
> r = getResultCode(res)
> if (r == Code.ERROR) {
> return new InterpreterResult(r, out.toString)
> } else if (r eq Code.INCOMPLETE) {
> incomplete += s + "\n"
> } else {
> incomplete = ""
> }
> }
> }
>
> if (r eq Code.INCOMPLETE) {
> return new InterpreterResult(r, "Incomplete expression")
> }
> else {
> return new InterpreterResult(r, out.buffer().toString(Charset.
> forName("utf-8")))
> }
> }
>
> private def getResultCode(r: Results.Result): Code = {
> if (r.isInstanceOf[Results.Success.type]) {
> return Code.SUCCESS
> }
> else if (r.isInstanceOf[Results.Incomplete.type]) {
> return Code.INCOMPLETE
> }
> else {
> return Code.ERROR
> }
> }
>
> }
> }
>
> object FlinkInterpreter extends Logging {
> var ourClassloader: ClassLoader = _
>
> def main(args: Array[String]): Unit = {
> val interpreter: FlinkInterpreter = new FlinkInterpreter
> val code =
> """
> |val dataStream = senv.fromElements(1,2,3,4,5)
> |dataStream.countWindowAll(2).sum(0).print()
> |senv.execute("My streaming program")
> """.stripMargin
> interpreter.open
> val result = interpreter.interpret(code)
> }
> }
>
> *The error messages i got are:*
> …
> …
> ...
> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
> Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES))
> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7
> did not equal the received leader session ID 00000000-0000-0000-0000-
> 000000000000.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
> Terminate JobClientActor.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
> Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/
> jobmanager#82627940].
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
> Shutting down remote daemon.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
> Remote daemon shut down; proceeding with flushing remote transports.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
> Remoting shut down.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:478)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:105)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:442)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:434)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.
> executeRemotely(RemoteStreamEnvironment.java:212)
> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironm
> ent.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.
> execute(RemoteStreamEnvironment.java:176)
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
> execute(StreamExecutionEnvironment.scala:638)
> ... 34 elided
> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Couldn't retrieve the JobExecutionResult from the JobManager.
> at org.apache.flink.runtime.client.JobClient.
> awaitJobResult(JobClient.java:309)
> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.
> java:396)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:467)
> ... 41 more
> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
> at org.apache.flink.runtime.client.JobSubmissionClientActor.
> handleCustomMessage(JobSubmissionClientActor.java:119)
> at org.apache.flink.runtime.client.JobClientActor.
> handleMessage(JobClientActor.java:251)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.
> handleLeaderSessionID(FlinkUntypedActor.java:89)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(
> FlinkUntypedActor.java:68)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
>
>
>
>
Re: got Warn message - "the expected leader session ID did not equal
the received leader session ID " when using LocalFlinkMiniCluster to
interpret scala code
Posted by Till Rohrmann <tr...@apache.org>.
Hi XiangWei,
the problem is that the LocalFlinkMiniCluster can no longer be used in
combination with a RemoteExecutionEnvironment. The reason is that the
LocalFlinkMiniCluster uses now an internal leader election service and
assigns leader ids to its components. Since this is an internal service it
is not possible to retrieve this information like it is the case with the
ZooKeeper based leader election services.
Long story short, the Flink Scala shell currently does not work with a
LocalFlinkMiniCluster and would have to be fixed to work properly together
with a local execution environment. Until then, I recommend starting a
local standalone cluster and let the code run there.
Cheers,
Till
On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw...@gmail.com>
wrote:
> dear all,
>
> *Below is the code i execute:*
>
> import java.io._
> import java.net.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
>
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter,
> InterpreterResult, InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions,
> Configuration, ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster,
> LocalFlinkMiniCluster}
>
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>
> class FlinkInterpreter extends Interpreter {
> private var bufferedReader: Option[BufferedReader] = None
> private var jprintWriter: JPrintWriter = _
> private val config = new Configuration;
> private var cluster: LocalFlinkMiniCluster = _
> @BeanProperty var imain: IMain = _
> @BeanProperty var flinkILoop: FlinkILoop = _
> private var out: ByteBufOutputStream = null
> private var outBuf: ByteBuf = null
> private var in: ByteBufInputStream = _
> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>
> override def isOpen: Boolean = {
> isRunning.get()
> }
>
> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port = AkkaUtils.getAddress(localCluster.
> jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port: ${
> localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
> }
>
>
> /**
> * Start flink cluster and create interpreter
> */
> override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1),
> None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val conf = cluster.configuration
> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
> flinkILoop = new FlinkILoop(host, port, conf, None)
> val settings = new Settings()
> settings.usejavacp.value = true
> settings.Yreplsync.value = true
> flinkILoop.settings_$eq(settings)
> flinkILoop.createInterpreter()
> imain = flinkILoop.intp
> FlinkInterpreter.ourClassloader = imain.classLoader
> val benv = flinkILoop.scalaBenv
> val senv = flinkILoop.scalaSenv
> benv.getConfig.disableSysoutLogging()
> senv.getConfig.disableSysoutLogging()
> // import libraries
> imain.interpret("import scala.tools.nsc.io._")
> // imain.interpret("import Properties.userHome")
> imain.interpret("import scala.compat.Platform.EOL")
> imain.interpret("import org.apache.flink.api.scala._")
> imain.interpret("import org.apache.flink.api.common.functions._")
> isRunning.set(true)
> }
>
> override def interpret(line: String): InterpreterResult = {
> if (line == null || line.trim.length == 0) {
> return new InterpreterResult(Code.SUCCESS)
> }
> interpret(line.split("\n"))
> }
>
> /**
> * Interprete code
> * @param lines
> * @return
> */
> def interpret(lines: Array[String]): InterpreterResult = {
> val imain: IMain = getImain
> val linesToRun: Array[String] = new Array[String](lines.length + 1)
> for (i <- 0 until lines.length) {
> linesToRun(i) = lines(i)
> }
> linesToRun(lines.length) = "print(\"\")"
> System.setOut(new PrintStream(out))
> out.buffer().clear()
> var r: Code = null
> var incomplete: String = ""
> var inComment: Boolean = false
> for (l <- 0 until linesToRun.length) {
> val s: String = linesToRun(l)
> var continuation: Boolean = false
> if (l + 1 < linesToRun.length) {
> val nextLine: String = linesToRun(l + 1).trim
> if (nextLine.isEmpty ||
> nextLine.startsWith("//") ||
> nextLine.startsWith("}") ||
> nextLine.startsWith("object")) {
> continuation = true
> } else if (!inComment && nextLine.startsWith("/*")) {
> inComment = true
> continuation = true
> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
> inComment = false
> continuation = true
> } else if (nextLine.length > 1 &&
> nextLine.charAt(0) == '.' &&
> nextLine.charAt(1) != '.' &&
> nextLine.charAt(1) != '/') {
> continuation = true
> } else if (inComment) {
> continuation = true
> }
> if (continuation) {
> incomplete += s + "\n"
> }
> }
> if (!continuation) {
> val currentCommand: String = incomplete
> var res: Results.Result = null
> try {
> res = Console.withOut(System.out)(new AbstractFunction0[Results.Result]
> {
> override def apply() = {
> imain.interpret(currentCommand + s)
> }
> }.apply())
> } catch {
> case e: Exception =>
> logError("Interpreter Exception ", e)
> return new InterpreterResult(Code.ERROR, InterpreterUtils.
> getMostRelevantMessage(e))
> }
> r = getResultCode(res)
> if (r == Code.ERROR) {
> return new InterpreterResult(r, out.toString)
> } else if (r eq Code.INCOMPLETE) {
> incomplete += s + "\n"
> } else {
> incomplete = ""
> }
> }
> }
>
> if (r eq Code.INCOMPLETE) {
> return new InterpreterResult(r, "Incomplete expression")
> }
> else {
> return new InterpreterResult(r, out.buffer().toString(Charset.
> forName("utf-8")))
> }
> }
>
> private def getResultCode(r: Results.Result): Code = {
> if (r.isInstanceOf[Results.Success.type]) {
> return Code.SUCCESS
> }
> else if (r.isInstanceOf[Results.Incomplete.type]) {
> return Code.INCOMPLETE
> }
> else {
> return Code.ERROR
> }
> }
>
> }
> }
>
> object FlinkInterpreter extends Logging {
> var ourClassloader: ClassLoader = _
>
> def main(args: Array[String]): Unit = {
> val interpreter: FlinkInterpreter = new FlinkInterpreter
> val code =
> """
> |val dataStream = senv.fromElements(1,2,3,4,5)
> |dataStream.countWindowAll(2).sum(0).print()
> |senv.execute("My streaming program")
> """.stripMargin
> interpreter.open
> val result = interpreter.interpret(code)
> }
> }
>
> *The error messages i got are:*
> …
> …
> ...
> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
> Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES))
> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7
> did not equal the received leader session ID 00000000-0000-0000-0000-
> 000000000000.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
> Terminate JobClientActor.
> [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor]
> Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/
> jobmanager#82627940].
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
> Shutting down remote daemon.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
> Remote daemon shut down; proceeding with flushing remote transports.
> [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator]
> Remoting shut down.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:478)
> at org.apache.flink.client.program.StandaloneClusterClient.submitJob(
> StandaloneClusterClient.java:105)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:442)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:434)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.
> executeRemotely(RemoteStreamEnvironment.java:212)
> at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironm
> ent.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
> at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.
> execute(RemoteStreamEnvironment.java:176)
> at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.
> execute(StreamExecutionEnvironment.scala:638)
> ... 34 elided
> Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Couldn't retrieve the JobExecutionResult from the JobManager.
> at org.apache.flink.runtime.client.JobClient.
> awaitJobResult(JobClient.java:309)
> at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.
> java:396)
> at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:467)
> ... 41 more
> Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> Job submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure
> and confirm the job submission.
> at org.apache.flink.runtime.client.JobSubmissionClientActor.
> handleCustomMessage(JobSubmissionClientActor.java:119)
> at org.apache.flink.runtime.client.JobClientActor.
> handleMessage(JobClientActor.java:251)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.
> handleLeaderSessionID(FlinkUntypedActor.java:89)
> at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(
> FlinkUntypedActor.java:68)
> at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(
> UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
>
>
>
>
>