You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Burt Parkers <bu...@googlemail.com> on 2018/06/01 13:06:26 UTC

Apache Flink Sink + Ignite: Ouch! Argument is invalid

Hi,


I'm trying to run an Apache Flink Programm with the flink-ignite Sink.
Everything is working fine if I start the Application from my IDE, but if I
submit the Application to the Flink Cluster I get this error:


java.lang.ExceptionInInitializerError
	at org.apache.ignite.sink.flink.IgniteSink$SinkContext.getStreamer(IgniteSink.java:201)
	at org.apache.ignite.sink.flink.IgniteSink$SinkContext.access$100(IgniteSink.java:175)
	at org.apache.ignite.sink.flink.IgniteSink.invoke(IgniteSink.java:165)
	at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:97)
	at org.myorg.quickstart.InstrumentStreamer$Splitter.flatMap(InstrumentStreamer.java:1)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:560)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:535)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:515)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Ouch! Argument is
invalid: Cache name must not be null or empty.
	at org.apache.ignite.internal.util.GridArgumentCheck.ensure(GridArgumentCheck.java:109)
	at org.apache.ignite.internal.processors.cache.GridCacheUtils.validateCacheName(GridCacheUtils.java:1581)
	at org.apache.ignite.internal.IgniteKernal.dataStreamer(IgniteKernal.java:3284)
	at org.apache.ignite.sink.flink.IgniteSink$SinkContext$Holder.<clinit>(IgniteSink.java:183)
	... 27 more


but the cache name is set:


https://github.com/bpark/flink-ignite-demo/blob/master/src/main/java/com/github/bpark/InstrumentStreamer.java


and configured:


https://github.com/bpark/flink-ignite-demo/blob/master/src/main/resources/flink-config.xml


ignite-flink, ignite-spring, ignite-indexing and ignite-core are included
in the shaded jar file.


The example project is located here:
https://github.com/bpark/flink-ignite-demo


Any idea what's wrong here?


- Burt

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Ray <ra...@cisco.com>.
Hello Saikat,

Thanks for the fix.
I validated this fix by running my WordCount application in both standalone
mode and cluster mode.
The data can be inserted.

But I found another problem here.
The data written into Ignite is not correct.
My application counts the word occurrence in this the following sentence.
         "To be, or not to be,--that is the question:--",
         "Whether 'tis nobler in the mind to suffer",
         "The slings and arrows of outrageous fortune",
         "Or to take arms against a sea of troubles,

The count of word "to" should be 9.
But when I check the result in Ignite, all the values of every word is 1.
Clearly it's wrong.
The reproducer program is the same as I attached in the  JIRA ticket.

Please let me know if you can reproduce this issue.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi,

I have submitted the below PR, please review and share feedback.


Jira: https://issues.apache.org/jira/browse/IGNITE-8697
PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat


On Thu, Jul 26, 2018 at 11:26 PM, Saikat Maitra <sa...@gmail.com>
wrote:

> Hi Andrew,
>
> As we discussed I have updated the PR, please take a look. If it looks
> good then I can go ahead and merge the changes.
>
> PR : https://github.com/apache/ignite/pull/4398
> Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695
>
> Regards,
> Saikat
>
> On Thu, Jul 26, 2018 at 11:25 PM, Saikat Maitra <sa...@gmail.com>
> wrote:
>
>> Hi Ray,
>>
>> We will need to use igniteSink.setAllowOverwrite(true) flag so that
>> latest computed values are stored in cache. Also we need not call
>> igniteSink.open(new Configuration)
>>
>> Please take a look into the below modified wordCount sample.
>>
>> https://github.com/samaitra/flink-fn/blob/master/flink-fn/sr
>> c/main/scala/com/samaitra/WordCount.scala
>>
>> Please review and share feedback
>>
>> Regards
>> Saikat
>>
>> On Thu, Jul 26, 2018 at 1:16 AM, Ray <ra...@cisco.com> wrote:
>>
>>> Hi Saikat,
>>>
>>> The results flink calculated before sending to sink is correct, but the
>>> results in Ignite is not correct.
>>> You can remove the sink and print the stream content to validate my
>>> point.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/
>>>
>>
>>
>

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Andrew,

As we discussed I have updated the PR, please take a look. If it looks good
then I can go ahead and merge the changes.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat

On Thu, Jul 26, 2018 at 11:25 PM, Saikat Maitra <sa...@gmail.com>
wrote:

> Hi Ray,
>
> We will need to use igniteSink.setAllowOverwrite(true) flag so that
> latest computed values are stored in cache. Also we need not call
> igniteSink.open(new Configuration)
>
> Please take a look into the below modified wordCount sample.
>
> https://github.com/samaitra/flink-fn/blob/master/flink-fn/
> src/main/scala/com/samaitra/WordCount.scala
>
> Please review and share feedback
>
> Regards
> Saikat
>
> On Thu, Jul 26, 2018 at 1:16 AM, Ray <ra...@cisco.com> wrote:
>
>> Hi Saikat,
>>
>> The results flink calculated before sending to sink is correct, but the
>> results in Ignite is not correct.
>> You can remove the sink and print the stream content to validate my point.
>>
>>
>>
>> --
>> Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/
>>
>
>

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Ray,

We will need to use igniteSink.setAllowOverwrite(true) flag so that latest
computed values are stored in cache. Also we need not call igniteSink.open(
new Configuration)

Please take a look into the below modified wordCount sample.

https://github.com/samaitra/flink-fn/blob/master/flink-fn/src/main/scala/com/samaitra/WordCount.scala

Please review and share feedback

Regards
Saikat

On Thu, Jul 26, 2018 at 1:16 AM, Ray <ra...@cisco.com> wrote:

> Hi Saikat,
>
> The results flink calculated before sending to sink is correct, but the
> results in Ignite is not correct.
> You can remove the sink and print the stream content to validate my point.
>
>
>
> --
> Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/
>

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Ray <ra...@cisco.com>.
Hi Saikat,

The results flink calculated before sending to sink is correct, but the
results in Ignite is not correct.
You can remove the sink and print the stream content to validate my point.



--
Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Stan

Thank you , we will continue the discussion in this dev-list.

Ray - Thank you for validating the changes, I will take a look into the
WordCount application. My understanding is the IgniteSink need not do any
reduce and the output collector will reduce the results and write to Ignite.

Regards,
Saikat

On Sun, Jul 22, 2018 at 5:07 PM, Stanislav Lukyanov <st...@gmail.com>
wrote:

> Hi guys,
>
> Thanks for helping with the fix!
>
> As this is a development topic now and not a usage one, I’m BCC’ing the
> user-list and replacing it with dev-list.
> Please continue the discussion there.
>
> Andrey, Dmitry, please help with the review.
>
> Thanks,
> Stan
>
> From: Saikat Maitra
> Sent: 22 июля 2018 г. 8:28
> To: user@ignite.apache.org; rayliu@cisco.com
> Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
>
> Hi Ray, Andrew
>
> As discussed I have fixed the issue with IgniteSink when running in
> cluster mode.
>
> Please review the below PR and share feedback.
>
> PR : https://github.com/apache/ignite/pull/4398
> Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695
>
> Regards,
> Saikat
>
>
>
>
> On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <sa...@gmail.com>
> wrote:
> Hi Ray,
>
> Thank you for validating the changes, I see that in cluster mode when I am
> checking the IgniteSink it is working as desired. In stand alone mode I can
> see we are getting the exception class org.apache.ignite.IgniteException:
> Default Ignite instance has already been started.
>
> Please take a look into this sample application https://github.
> com/samaitra/streamers which I used to run it with flink in cluster mode.
>
> I am considering if I should make changes to run the IgniteSink in client
> mode similar to the ways flink connector for redis and flume were
> implemented in Apache Bahir
>
> https://github.com/apache/bahir-flink
>
> I will share update soon.
>
> Regards,
> Saikat
>
>
>
> On Sun, Jul 15, 2018 at 10:07 PM, Ray <ra...@cisco.com> wrote:
> Hello Saikat,
>
> I tried your newest code and wrote a simple word count application to test
> the sink.
> It appears there's still problems.
> Here's my code.
>
>
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.extensions._
> import org.apache.flink.configuration.Configuration
> import org.apache.ignite.Ignition
> import org.apache.ignite.configuration.CacheConfiguration
>
> import scala.collection.JavaConverters._
>
>
> object WordCount {
>
>         def main(args: Array[String]) {
>
>                 val ignite = Ignition.start("ignite.xml")
>                 val cacheConfig = new CacheConfiguration[Any, Any]()
>                 ignite.destroyCache("aaa")
>                 cacheConfig.setName("aaa")
>                 cacheConfig.setSqlSchema("PUBLIC")
>                 ignite.createCache(cacheConfig)
>                 ignite.close()
>
>
>                 // set up the execution environment
>                 val env = StreamExecutionEnvironment.
> getExecutionEnvironment
>
>                 val igniteSink = new IgniteSink[java.util.Map[String,
> Int]]("aaa",
> "ignite.xml")
>
>                 igniteSink.setAllowOverwrite(false)
>                 igniteSink.setAutoFlushFrequency(1)
>
>                 igniteSink.open(new Configuration)
>
>
>                 // get input data
>                 val text = env.fromElements(
>                         "To be, or not to be,--that is the question:--",
>                         "Whether 'tis nobler in the mind to suffer",
>                         "The slings and arrows of outrageous fortune",
>                         "Or to take arms against a sea of troubles,")
>
>
>                 val counts = text
>                         // split up the lines in pairs (2-tuples)
> containing: (word,1)
>                         .flatMap(_.toLowerCase.split("\\W+"))
>                         .filter(_.nonEmpty)
>                         .map((_, 1))
>                         // group by the tuple field "0" and sum up tuple
> field "1"
>                         .keyBy(0)
>                         .sum(1)
>                         // Convert to key/value format before ingesting to
> Ignite
>                         .mapWith { case (k: String, v: Int) => Map(k ->
> v).asJava }
>                         .addSink(igniteSink)
>
>                 try
>                         env.execute("Streaming WordCount1")
>                 catch {
>                         case e: Exception =>
>
>                         // Exception handling.
>                 } finally igniteSink.close()
>
>         }
> }
>
> I tried running this application in Idea and the error log snippet is as
> follows
>
> 07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched
> to
> FAILED
> class org.apache.ignite.IgniteException: Default Ignite instance has
> already
> been started.
>         at
> org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.
> java:990)
>         at org.apache.ignite.Ignition.start(Ignition.java:355)
>         at IgniteSink.open(IgniteSink.java:135)
>         at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:111)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
> instance has already been started.
>         at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.
> java:1134)
>         at
> org.apache.ignite.internal.IgnitionEx.startConfigurations(
> IgnitionEx.java:1069)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:955)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:854)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:724)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:693)
>         at org.apache.ignite.Ignition.start(Ignition.java:352)
>         ... 7 more
>
> 07/16/2018 11:05:30     Job execution switched to status FAILING.
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>
>
>
>

RE: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Stanislav Lukyanov <st...@gmail.com>.
Hi guys,

Thanks for helping with the fix!

As this is a development topic now and not a usage one, I’m BCC’ing the user-list and replacing it with dev-list.
Please continue the discussion there.

Andrey, Dmitry, please help with the review.

Thanks,
Stan

From: Saikat Maitra
Sent: 22 июля 2018 г. 8:28
To: user@ignite.apache.org; rayliu@cisco.com
Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Hi Ray, Andrew

As discussed I have fixed the issue with IgniteSink when running in cluster mode.

Please review the below PR and share feedback.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat




On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <sa...@gmail.com> wrote:
Hi Ray,

Thank you for validating the changes, I see that in cluster mode when I am checking the IgniteSink it is working as desired. In stand alone mode I can see we are getting the exception class org.apache.ignite.IgniteException: Default Ignite instance has already been started.

Please take a look into this sample application https://github.com/samaitra/streamers which I used to run it with flink in cluster mode.

I am considering if I should make changes to run the IgniteSink in client mode similar to the ways flink connector for redis and flume were implemented in Apache Bahir

https://github.com/apache/bahir-flink

I will share update soon.

Regards,
Saikat



On Sun, Jul 15, 2018 at 10:07 PM, Ray <ra...@cisco.com> wrote:
Hello Saikat,

I tried your newest code and wrote a simple word count application to test
the sink.
It appears there's still problems.
Here's my code.



import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration

import scala.collection.JavaConverters._


object WordCount {

        def main(args: Array[String]) {

                val ignite = Ignition.start("ignite.xml")
                val cacheConfig = new CacheConfiguration[Any, Any]()
                ignite.destroyCache("aaa")
                cacheConfig.setName("aaa")
                cacheConfig.setSqlSchema("PUBLIC")
                ignite.createCache(cacheConfig)
                ignite.close()


                // set up the execution environment
                val env = StreamExecutionEnvironment.getExecutionEnvironment

                val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa",
"ignite.xml")

                igniteSink.setAllowOverwrite(false)
                igniteSink.setAutoFlushFrequency(1)

                igniteSink.open(new Configuration)


                // get input data
                val text = env.fromElements(
                        "To be, or not to be,--that is the question:--",
                        "Whether 'tis nobler in the mind to suffer",
                        "The slings and arrows of outrageous fortune",
                        "Or to take arms against a sea of troubles,")


                val counts = text
                        // split up the lines in pairs (2-tuples) containing: (word,1)
                        .flatMap(_.toLowerCase.split("\\W+"))
                        .filter(_.nonEmpty)
                        .map((_, 1))
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0)
                        .sum(1)
                        // Convert to key/value format before ingesting to Ignite
                        .mapWith { case (k: String, v: Int) => Map(k -> v).asJava }
                        .addSink(igniteSink)

                try
                        env.execute("Streaming WordCount1")
                catch {
                        case e: Exception =>

                        // Exception handling.
                } finally igniteSink.close()

        }
}

I tried running this application in Idea and the error log snippet is as
follows

07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched to
FAILED 
class org.apache.ignite.IgniteException: Default Ignite instance has already
been started.
        at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
        at org.apache.ignite.Ignition.start(Ignition.java:355)
        at IgniteSink.open(IgniteSink.java:135)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
        at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134)
        at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693)
        at org.apache.ignite.Ignition.start(Ignition.java:352)
        ... 7 more

07/16/2018 11:05:30     Job execution switched to status FAILING.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/




Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Ray, Andrew

As discussed I have fixed the issue with IgniteSink when running in cluster
mode.

Please review the below PR and share feedback.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat




On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <sa...@gmail.com>
wrote:

> Hi Ray,
>
> Thank you for validating the changes, I see that in cluster mode when I am
> checking the IgniteSink it is working as desired. In stand alone mode I can
> see we are getting the exception class org.apache.ignite.IgniteException:
> Default Ignite instance has already been started.
>
> Please take a look into this sample application https://github.
> com/samaitra/streamers which I used to run it with flink in cluster mode.
>
> I am considering if I should make changes to run the IgniteSink in client
> mode similar to the ways flink connector for redis and flume were
> implemented in Apache Bahir
>
> https://github.com/apache/bahir-flink
>
> I will share update soon.
>
> Regards,
> Saikat
>
>
>
> On Sun, Jul 15, 2018 at 10:07 PM, Ray <ra...@cisco.com> wrote:
>
>> Hello Saikat,
>>
>> I tried your newest code and wrote a simple word count application to test
>> the sink.
>> It appears there's still problems.
>> Here's my code.
>>
>>
>>
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.scala.extensions._
>> import org.apache.flink.configuration.Configuration
>> import org.apache.ignite.Ignition
>> import org.apache.ignite.configuration.CacheConfiguration
>>
>> import scala.collection.JavaConverters._
>>
>>
>> object WordCount {
>>
>>         def main(args: Array[String]) {
>>
>>                 val ignite = Ignition.start("ignite.xml")
>>                 val cacheConfig = new CacheConfiguration[Any, Any]()
>>                 ignite.destroyCache("aaa")
>>                 cacheConfig.setName("aaa")
>>                 cacheConfig.setSqlSchema("PUBLIC")
>>                 ignite.createCache(cacheConfig)
>>                 ignite.close()
>>
>>
>>                 // set up the execution environment
>>                 val env = StreamExecutionEnvironment.get
>> ExecutionEnvironment
>>
>>                 val igniteSink = new IgniteSink[java.util.Map[String,
>> Int]]("aaa",
>> "ignite.xml")
>>
>>                 igniteSink.setAllowOverwrite(false)
>>                 igniteSink.setAutoFlushFrequency(1)
>>
>>                 igniteSink.open(new Configuration)
>>
>>
>>                 // get input data
>>                 val text = env.fromElements(
>>                         "To be, or not to be,--that is the question:--",
>>                         "Whether 'tis nobler in the mind to suffer",
>>                         "The slings and arrows of outrageous fortune",
>>                         "Or to take arms against a sea of troubles,")
>>
>>
>>                 val counts = text
>>                         // split up the lines in pairs (2-tuples)
>> containing: (word,1)
>>                         .flatMap(_.toLowerCase.split("\\W+"))
>>                         .filter(_.nonEmpty)
>>                         .map((_, 1))
>>                         // group by the tuple field "0" and sum up tuple
>> field "1"
>>                         .keyBy(0)
>>                         .sum(1)
>>                         // Convert to key/value format before ingesting
>> to Ignite
>>                         .mapWith { case (k: String, v: Int) => Map(k ->
>> v).asJava }
>>                         .addSink(igniteSink)
>>
>>                 try
>>                         env.execute("Streaming WordCount1")
>>                 catch {
>>                         case e: Exception =>
>>
>>                         // Exception handling.
>>                 } finally igniteSink.close()
>>
>>         }
>> }
>>
>> I tried running this application in Idea and the error log snippet is as
>> follows
>>
>> 07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched
>> to
>> FAILED
>> class org.apache.ignite.IgniteException: Default Ignite instance has
>> already
>> been started.
>>         at
>> org.apache.ignite.internal.util.IgniteUtils.convertException
>> (IgniteUtils.java:990)
>>         at org.apache.ignite.Ignition.start(Ignition.java:355)
>>         at IgniteSink.open(IgniteSink.java:135)
>>         at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:111)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:376)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:253)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
>> instance has already been started.
>>         at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java
>> :1134)
>>         at
>> org.apache.ignite.internal.IgnitionEx.startConfigurations(Ig
>> nitionEx.java:1069)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 955)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 854)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 724)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 693)
>>         at org.apache.ignite.Ignition.start(Ignition.java:352)
>>         ... 7 more
>>
>> 07/16/2018 11:05:30     Job execution switched to status FAILING.
>>
>>
>>
>> --
>> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>>
>
>

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi Ray,

Thank you for validating the changes, I see that in cluster mode when I am
checking the IgniteSink it is working as desired. In stand alone mode I can
see we are getting the exception class org.apache.ignite.IgniteException:
Default Ignite instance has already been started.

Please take a look into this sample application
https://github.com/samaitra/streamers which I used to run it with flink in
cluster mode.

I am considering if I should make changes to run the IgniteSink in client
mode similar to the ways flink connector for redis and flume were
implemented in Apache Bahir

https://github.com/apache/bahir-flink

I will share update soon.

Regards,
Saikat



On Sun, Jul 15, 2018 at 10:07 PM, Ray <ra...@cisco.com> wrote:

> Hello Saikat,
>
> I tried your newest code and wrote a simple word count application to test
> the sink.
> It appears there's still problems.
> Here's my code.
>
>
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.extensions._
> import org.apache.flink.configuration.Configuration
> import org.apache.ignite.Ignition
> import org.apache.ignite.configuration.CacheConfiguration
>
> import scala.collection.JavaConverters._
>
>
> object WordCount {
>
>         def main(args: Array[String]) {
>
>                 val ignite = Ignition.start("ignite.xml")
>                 val cacheConfig = new CacheConfiguration[Any, Any]()
>                 ignite.destroyCache("aaa")
>                 cacheConfig.setName("aaa")
>                 cacheConfig.setSqlSchema("PUBLIC")
>                 ignite.createCache(cacheConfig)
>                 ignite.close()
>
>
>                 // set up the execution environment
>                 val env = StreamExecutionEnvironment.
> getExecutionEnvironment
>
>                 val igniteSink = new IgniteSink[java.util.Map[String,
> Int]]("aaa",
> "ignite.xml")
>
>                 igniteSink.setAllowOverwrite(false)
>                 igniteSink.setAutoFlushFrequency(1)
>
>                 igniteSink.open(new Configuration)
>
>
>                 // get input data
>                 val text = env.fromElements(
>                         "To be, or not to be,--that is the question:--",
>                         "Whether 'tis nobler in the mind to suffer",
>                         "The slings and arrows of outrageous fortune",
>                         "Or to take arms against a sea of troubles,")
>
>
>                 val counts = text
>                         // split up the lines in pairs (2-tuples)
> containing: (word,1)
>                         .flatMap(_.toLowerCase.split("\\W+"))
>                         .filter(_.nonEmpty)
>                         .map((_, 1))
>                         // group by the tuple field "0" and sum up tuple
> field "1"
>                         .keyBy(0)
>                         .sum(1)
>                         // Convert to key/value format before ingesting to
> Ignite
>                         .mapWith { case (k: String, v: Int) => Map(k ->
> v).asJava }
>                         .addSink(igniteSink)
>
>                 try
>                         env.execute("Streaming WordCount1")
>                 catch {
>                         case e: Exception =>
>
>                         // Exception handling.
>                 } finally igniteSink.close()
>
>         }
> }
>
> I tried running this application in Idea and the error log snippet is as
> follows
>
> 07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched
> to
> FAILED
> class org.apache.ignite.IgniteException: Default Ignite instance has
> already
> been started.
>         at
> org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.
> java:990)
>         at org.apache.ignite.Ignition.start(Ignition.java:355)
>         at IgniteSink.open(IgniteSink.java:135)
>         at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:111)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
> instance has already been started.
>         at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.
> java:1134)
>         at
> org.apache.ignite.internal.IgnitionEx.startConfigurations(
> IgnitionEx.java:1069)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:955)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:854)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:724)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:693)
>         at org.apache.ignite.Ignition.start(Ignition.java:352)
>         ... 7 more
>
> 07/16/2018 11:05:30     Job execution switched to status FAILING.
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Ray <ra...@cisco.com>.
Hello Saikat,

I tried your newest code and wrote a simple word count application to test
the sink.
It appears there's still problems.
Here's my code.



import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration

import scala.collection.JavaConverters._


object WordCount {

	def main(args: Array[String]) {

		val ignite = Ignition.start("ignite.xml")
		val cacheConfig = new CacheConfiguration[Any, Any]()
		ignite.destroyCache("aaa")
		cacheConfig.setName("aaa")
		cacheConfig.setSqlSchema("PUBLIC")
		ignite.createCache(cacheConfig)
		ignite.close()


		// set up the execution environment
		val env = StreamExecutionEnvironment.getExecutionEnvironment

		val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa",
"ignite.xml")

		igniteSink.setAllowOverwrite(false)
		igniteSink.setAutoFlushFrequency(1)

		igniteSink.open(new Configuration)


		// get input data
		val text = env.fromElements(
			"To be, or not to be,--that is the question:--",
			"Whether 'tis nobler in the mind to suffer",
			"The slings and arrows of outrageous fortune",
			"Or to take arms against a sea of troubles,")


		val counts = text
			// split up the lines in pairs (2-tuples) containing: (word,1)
			.flatMap(_.toLowerCase.split("\\W+"))
			.filter(_.nonEmpty)
			.map((_, 1))
			// group by the tuple field "0" and sum up tuple field "1"
			.keyBy(0)
			.sum(1)
			// Convert to key/value format before ingesting to Ignite
			.mapWith { case (k: String, v: Int) => Map(k -> v).asJava }
			.addSink(igniteSink)

		try
			env.execute("Streaming WordCount1")
		catch {
			case e: Exception =>

			// Exception handling.
		} finally igniteSink.close()

	}
}

I tried running this application in Idea and the error log snippet is as
follows

07/16/2018 11:05:30	aggregation -> Map -> Sink: Unnamed(4/8) switched to
FAILED 
class org.apache.ignite.IgniteException: Default Ignite instance has already
been started.
	at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
	at org.apache.ignite.Ignition.start(Ignition.java:355)
	at IgniteSink.open(IgniteSink.java:135)
	at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
	at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134)
	at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069)
	at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955)
	at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854)
	at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724)
	at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693)
	at org.apache.ignite.Ignition.start(Ignition.java:352)
	... 7 more

07/16/2018 11:05:30	Job execution switched to status FAILING.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hello Ray,

I have fixed the issue related to Flink IgniteSink. Please take a look on
the changes [https://github.com/samaitra/ignite/blob/IGNITE-8697/
modules/flink/src/main/java/org/apache/ignite/sink/flink/IgniteSink.java]

I am working on the testcases and will raise the PR soon.

Regards
Saikat

On Tue, Jun 5, 2018 at 1:13 AM, Ray <ra...@cisco.com> wrote:

> Yes, the cache is already created before running my flink application.
>
> The issue can be reproduced when you submit your flink application to your
> flink cluster.
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Ray <ra...@cisco.com>.
Yes, the cache is already created before running my flink application.

The issue can be reproduced when you submit your flink application to your
flink cluster.




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Saikat Maitra <sa...@gmail.com>.
Hi,

When Ignite Sink Data Streamer start it checks if the cache name is already
present in the grid before the streaming process can begin.

Can you please confirm if cache got created before data sink process get
executed

Regards,
Saikat

On Mon, Jun 4, 2018 at 9:24 PM, Ray <ra...@cisco.com> wrote:

> I think it's a code bug in flink sink.
> I had this same problem some time ago.
> I think it's caused by compiler optimization of variable initialization in
> multi thread environment(flink cluster mode).
> In this case, the variable "cacheName" is not initialized when being used
> because compile will optimize the variable initialize order in multi thread
> environment.
>
> I have created the ticket in jira and assigned to the author of flink sink.
>
> https://issues.apache.org/jira/browse/IGNITE-8697
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Ray <ra...@cisco.com>.
I think it's a code bug in flink sink.
I had this same problem some time ago.
I think it's caused by compiler optimization of variable initialization in
multi thread environment(flink cluster mode).
In this case, the variable "cacheName" is not initialized when being used
because compile will optimize the variable initialize order in multi thread
environment.

I have created the ticket in jira and assigned to the author of flink sink.

https://issues.apache.org/jira/browse/IGNITE-8697



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Burt Parkers <bu...@googlemail.com>.
Hi,

it runs fine in a single VM, but if I build the uber-jar (contains the
application and all required ignite dependencies) and submit it to the
flink cluster (with master and worker nodes each in different VM's) the
cache configuration isn't loaded:

2018-06-01 22:05:30,665 INFO  org.apache.ignite.internal.IgniteKernal
                 - Config URL: n/a

It seems that both fields in IgniteSink:

    /** Ignite grid configuration file. */
    private static String igniteCfgFile;

    /** Cache name. */
    private static String cacheName;

are null.

Maybe the Sink isn't initialized correctly after serialization from the
master to the worker node? I'm not familiar with the Kryo Serialization
used by Apache Flink, but maybe static fields are ignored?

If I create my own copy of IgniteSink with hardcoded values

        private static class Holder {
            private static final Ignite IGNITE =
Ignition.start("flink-config.xml");
            private static final IgniteDataStreamer STREAMER =
IGNITE.dataStreamer("DemoCache");
        }

then the cache configuration is loaded and everything is working as
expected:

2018-06-01 22:28:12,985 INFO  org.apache.ignite.internal.IgniteKernal
                 - Config URL:
/tmp/blobStore-56228c43-2ed8-46d3-bbf1-631c3ef778b3/job_132b1e78c18e363739fede5fded4214b/blob_p-a85926793530ee5cec5e7c9372ddbfc22020a0d0-d4b791f47ff24933e8d6ff8ccc4fa665!/flink-config.xml

- Burt

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Posted by Humphrey <hm...@gmail.com>.
It is working fine here.

Tweaked the main method a bit:
    public static void main(String[] args) throws Exception {
    	System.setProperty("IGNITE_QUIET", "false");    	

        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().enableSysoutLogging();

        IgniteSink<Map&lt;String, String>> igniteSink = new
IgniteSink<>("DemoCache", "flink-config.xml");
        igniteSink.setAllowOverwrite(true);
        igniteSink.setAutoFlushFrequency(10);

        igniteSink.start();

        System.out.println("\n\nSTARTED SUCCESSFULLY\n\n");
        //DataStream<String> text = env.socketTextStream("localhost",
12200);

        //DataStream<Map&lt;String, String>> datastream = text.flatMap(new
Splitter());
        //datastream.addSink(igniteSink);

        //env.execute("Demo Streamer");
        igniteSink.stop();
    }

I added the following dependency:
        <dependency>
            <groupId>org.apache.ignite</groupId>
        	<artifactId>ignite-log4j</artifactId>	
            <version>${ignite.version}</version>
        </dependency>

The log:

>>> +----------------------------------------------------------------------+
>>> Ignite ver. 2.5.0#20180523-sha1:86e110c750a340dc9be2d396415f0b80d7ed8813
>>> +----------------------------------------------------------------------+
>>> OS name: Windows 7 6.1 amd64
>>> CPU(s): 4
>>> Heap: 3.5GB
>>> VM name: 13908@WPU8L0031201
>>> Local node [ID=C91AF628-B784-4CB9-8FF9-AA2E0F04D303, order=1,
>>> clientMode=false]
>>> Local node addresses: [WPU8L0031201.ad.ing.net/0:0:0:0:0:0:0:1,
>>> WPU8L0031201.ad.ing.net/127.0.0.1, WPU8L0031201.ad.ing.net/192.168.1.69,
>>> /192.168.56.1, /192.168.99.1]
>>> Local ports: TCP:10800 TCP:11211 TCP:47100 TCP:47500 

17:27:42,928 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -
Topology snapshot [ver=1, servers=1, clients=0, CPUs=4, offheap=5.1GB,
heap=3.5GB]
17:27:42,928 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -   ^--
Node [id=C91AF628-B784-4CB9-8FF9-AA2E0F04D303, clusterState=ACTIVE]
17:27:42,928 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  - Data
Regions Configured:
17:27:42,930 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -   ^--
default [initSize=256,0 MiB, maxSize=3,1 GiB, persistenceEnabled=false]
17:27:42,930 INFO 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager  -   ^--
Data_Region [initSize=1,0 GiB, maxSize=2,0 GiB, persistenceEnabled=false]


STARTED SUCCESSFULLY


17:27:43,105 INFO 
org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProtocol 
- Command protocol successfully stopped: TCP binary
17:27:43,159 INFO 
org.apache.ignite.internal.processors.cache.GridCacheProcessor  - Stopped
cache [cacheName=DemoCache]
17:27:43,159 INFO 
org.apache.ignite.internal.processors.cache.GridCacheProcessor  - Stopped
cache [cacheName=ignite-sys-cache]
17:27:43,238 INFO  org.apache.ignite.internal.IgniteKernal                      
- 

>>> +---------------------------------------------------------------------------------+
>>> Ignite ver. 2.5.0#20180523-sha1:86e110c750a340dc9be2d396415f0b80d7ed8813
>>> stopped OK
>>> +---------------------------------------------------------------------------------+
>>> Grid uptime: 00:00:00.306



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/