You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "jeado ko (JIRA)" <ji...@apache.org> on 2019/01/09 06:07:00 UTC
[jira] [Created] (IGNITE-10861) Using multiple Ignite Sink got
Ignite instance has already been started Error
jeado ko created IGNITE-10861:
---------------------------------
Summary: Using multiple Ignite Sink got Ignite instance has already been started Error
Key: IGNITE-10861
URL: https://issues.apache.org/jira/browse/IGNITE-10861
Project: Ignite
Issue Type: Bug
Components: streaming
Affects Versions: 2.7
Reporter: jeado ko
I got following error when I create multiple sink in Flink 1.7.0
{code:java}
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
at org.apache.ignite.Ignition.start(Ignition.java:348)
{code}
and this is my flink job code
{code:java}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.ignite.sink.flink.IgniteSink
import scala.collection.JavaConverters._
object IgniteSinkTestJob extends App {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", "ignite-test.xml")
igniteSink.setAllowOverwrite(true)
igniteSink.setAutoFlushFrequency(10)
// igniteSink.open(new Configuration)
val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2", "ignite-test.xml")
igniteSink2.setAllowOverwrite(true)
igniteSink2.setAutoFlushFrequency(10)
// igniteSink2.open(new Configuration)
val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3", "ignite-test.xml")
igniteSink3.setAllowOverwrite(true)
igniteSink3.setAutoFlushFrequency(10)
// igniteSink3.open(new Configuration)
val source = env.fromCollection(
Array(
Map("key1" -> "hello1"),
Map("key1" -> "hello11"),
Map("key1" -> "hello144"),
Map("key1" -> "hello1155"),
Map("key2" -> "hello2"),
Map("key2" -> "hello3"),
Map("key3" -> "hello23"),
Map("key3" -> "hello25")
).map(_.asJava)
)
source
.filter(v => v.containsKey("key1"))
.setParallelism(2)
.addSink(igniteSink)
.name("sink1")
.setParallelism(1)
source.filter(v => v.containsKey("key2"))
.setParallelism(2)
.addSink(igniteSink2)
.name("sink2")
.setParallelism(1)
source.filter(v => v.containsKey("key3"))
.setParallelism(2)
.addSink(igniteSink3)
.name("sink3")
.setParallelism(1)
env.execute("test ignite sink")
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)