You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "jeado ko (JIRA)" <ji...@apache.org> on 2019/01/09 06:07:00 UTC

[jira] [Created] (IGNITE-10861) Using multiple Ignite Sink got Ignite instance has already been started Error

jeado ko created IGNITE-10861:
---------------------------------

             Summary: Using multiple Ignite Sink got Ignite instance has already been started Error
                 Key: IGNITE-10861
                 URL: https://issues.apache.org/jira/browse/IGNITE-10861
             Project: Ignite
          Issue Type: Bug
          Components: streaming
    Affects Versions: 2.7
            Reporter: jeado ko


I got following error when I create multiple sink in Flink 1.7.0
{code:java}
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started.
at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141)
at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731)
at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700)
at org.apache.ignite.Ignition.start(Ignition.java:348)
{code}
and this is my flink job code
{code:java}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.ignite.sink.flink.IgniteSink

import scala.collection.JavaConverters._

object IgniteSinkTestJob extends App {

  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", "ignite-test.xml")
  igniteSink.setAllowOverwrite(true)
  igniteSink.setAutoFlushFrequency(10)
//  igniteSink.open(new Configuration)

  val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2", "ignite-test.xml")
  igniteSink2.setAllowOverwrite(true)
  igniteSink2.setAutoFlushFrequency(10)
//  igniteSink2.open(new Configuration)

  val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3", "ignite-test.xml")
  igniteSink3.setAllowOverwrite(true)
  igniteSink3.setAutoFlushFrequency(10)
//  igniteSink3.open(new Configuration)

  val source = env.fromCollection(
    Array(
      Map("key1" -> "hello1"),
      Map("key1" -> "hello11"),
      Map("key1" -> "hello144"),
      Map("key1" -> "hello1155"),
      Map("key2" -> "hello2"),
      Map("key2" -> "hello3"),
      Map("key3" -> "hello23"),
      Map("key3" -> "hello25")
    ).map(_.asJava)
  )

  source
    .filter(v => v.containsKey("key1"))
    .setParallelism(2)
    .addSink(igniteSink)
      .name("sink1")
      .setParallelism(1)
  source.filter(v => v.containsKey("key2"))
    .setParallelism(2)
    .addSink(igniteSink2)
      .name("sink2")
      .setParallelism(1)
  source.filter(v => v.containsKey("key3"))
    .setParallelism(2)
    .addSink(igniteSink3)
      .name("sink3")
      .setParallelism(1)

  env.execute("test ignite sink")

}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)