You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by amir bahmanyari <am...@yahoo.com> on 2016/05/08 04:34:44 UTC

How to set groupId in KafkaIO.Read()

Sorry missed the subject! :)

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Saturday, May 7, 2016 9:31 PM
 Subject: 
   
Hi colleagues,Hope you are having a great weekend.I get a Kafka configuration exception, GroupId not being set,  when trying to run my Beam app (Flink runner) in a Flink cluster.

I couldn't find a reference to a method that seta a GroupId  similar to what we do for withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.The bottom of the stack trace is provide below.How can I set a GroupId property for KafkaIO.read()?Thanks for your help.
        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


  

Re: How to set groupId in KafkaIO.Read()

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,I switched to FlinkPipelineOptions instead of Options interface. The FlinkPipelineRunner invokes a method in org/apache/beam/runners/dataflow/DataflowPipelineRunner.
Which Beam jar file contains this DataflowPipelineRunner class?
aused by: java.lang.NoClassDefFoundError: org/apache/beam/runners/dataflow/DataflowPipelineRunner        at org.apache.beam.runners.flink.FlinkPipelineRunner.fromOptions(FlinkPipelineRunner.java:8
Thanks so much for your time.AB      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>; Raghu Angadi <ra...@google.com> 
 Sent: Sunday, May 8, 2016 11:37 PM
 Subject: Re: How to set groupId in KafkaIO.Read()
   
Hi Raghu,I passed this exception and many more behind by placing required jars in Flink Lib folder.I MUST have the folliwng jars in Flink Lib folder to avoid such runtime exceptions:

I now get a different issue. I use interface Options in my code following a Beam kafka example.Exact same as how it is in the published example :
public static interface Options extends PipelineOptions, FlinkPipelineOptions {       @Description("Path of the file to write to")        @Validation.Required        String getOutput();        void setOutput(String value);      } 
When deploying my Beam app fat jar to Flink cluster, it complains that Options is not visible to the class loader.Is there a property I need to set to make Options available to the class loader?I tried both public & public static, different name etc....same issue.Following is the stacktrace.I appreciate your help.Cheers
The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1184)        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1234)Caused by: java.lang.IllegalArgumentException: interface com.myco.tech.arc.ReadFromKafka2$Options is not visible from class loader        at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:581)        at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:557)        at java.lang.reflect.WeakCache$Factory.get(WeakCache.java:230)        at java.lang.reflect.WeakCache.get(WeakCache.java:127)        at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:419)        at java.lang.reflect.Proxy.getProxyClass(Proxy.java:371)        at org.apache.beam.sdk.options.PipelineOptionsFactory.validateWellFormed(PipelineOptionsFactory.java:620)        at org.apache.beam.sdk.options.ProxyInvocationHandler.as(ProxyInvocationHandler.java:208)        at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:297)        at org.apache.beam.sdk.options.PipelineOptionsFactory.as(PipelineOptionsFactory.java:130)        at com.walmart.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:125)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)        ... 6 more


      From: amir bahmanyari <am...@yahoo.com>
 To: Raghu Angadi <ra...@google.com> 
Cc: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
 Sent: Sunday, May 8, 2016 7:48 PM
 Subject: Re: How to set groupId in KafkaIO.Read()
  
Yes , I am wondering too. It happens when executing p.run().What checkpoints can i do?Following is the complete stacktrace, FYI.Thanks very much for your attention Raghu.
 java.lang.RuntimeException: Pipeline execution failed        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:119)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:51)        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at com.walmart.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:321)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1184)        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1234)Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2acc4c7e104ea9695a09eda8bc7a6d36 (readfromkafka2-abahman-0509024419)        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:146)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:116)        ... 14 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2acc4c7e104ea9695a09eda8bc7a6d36 (readfromkafka2-abahman-0509024419)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1223)        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:469)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:113)        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at akka.actor.ActorCell.invoke(ActorCell.scala:487)        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)        at akka.dispatch.Mailbox.run(Mailbox.scala:221)        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not create input splits from Source.        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:701)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1147)        ... 23 moreCaused by: java.io.IOException: Could not create input splits from Source.        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:109)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:41)        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


      From: Raghu Angadi <ra...@google.com>
 To: amir bahmanyari <am...@yahoo.com> 
Cc: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
 Sent: Sunday, May 8, 2016 6:35 PM
 Subject: Re: How to set groupId in KafkaIO.Read()
  
The main issue is why are seeing this exception in the first place. You don't have to provide a group id. 

On Sunday, May 8, 2016, Raghu Angadi <ra...@google.com> wrote:

Please see javadic for the method. You can create a map anyway you like. Do you need help creating a map in Java? 

On Sunday, May 8, 2016, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,Am using Kafka 0.9.0. Added updateConsumerProperites(ImmutableMap.of("group.id", "myGroup").I get ImmutableMap not resolved 
tried many ways to put the right jar to resolve it. Didnt cut it.
Thanks.      
What version of Kafka are you using? KafkaIO does not need you to set a group.id. Not sure why you are seeing this exception.
Though KafkaIO does not need it, you can set it:  KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id", "temp"));


On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry missed the subject! :)

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Saturday, May 7, 2016 9:31 PM
 Subject: 
  
Hi colleagues,Hope you are having a great weekend.I get a Kafka configuration exception, GroupId not being set,  when trying to run my Beam app (Flink runner) in a Flink cluster.

I couldn't find a reference to a method that seta a GroupId  similar to what we do for withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.The bottom of the stack trace is provide below.How can I set a GroupId property for KafkaIO.read()?Thanks for your help.
        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


   



   



   

   

  

Re: How to set groupId in KafkaIO.Read()

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,I passed this exception and many more behind by placing required jars in Flink Lib folder.I MUST have the folliwng jars in Flink Lib folder to avoid such runtime exceptions:

I now get a different issue. I use interface Options in my code following a Beam kafka example.Exact same as how it is in the published example :
public static interface Options extends PipelineOptions, FlinkPipelineOptions {       @Description("Path of the file to write to")        @Validation.Required        String getOutput();        void setOutput(String value);      } 
When deploying my Beam app fat jar to Flink cluster, it complains that Options is not visible to the class loader.Is there a property I need to set to make Options available to the class loader?I tried both public & public static, different name etc....same issue.Following is the stacktrace.I appreciate your help.Cheers
The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1184)        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1234)Caused by: java.lang.IllegalArgumentException: interface com.myco.tech.arc.ReadFromKafka2$Options is not visible from class loader        at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:581)        at java.lang.reflect.Proxy$ProxyClassFactory.apply(Proxy.java:557)        at java.lang.reflect.WeakCache$Factory.get(WeakCache.java:230)        at java.lang.reflect.WeakCache.get(WeakCache.java:127)        at java.lang.reflect.Proxy.getProxyClass0(Proxy.java:419)        at java.lang.reflect.Proxy.getProxyClass(Proxy.java:371)        at org.apache.beam.sdk.options.PipelineOptionsFactory.validateWellFormed(PipelineOptionsFactory.java:620)        at org.apache.beam.sdk.options.ProxyInvocationHandler.as(ProxyInvocationHandler.java:208)        at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:297)        at org.apache.beam.sdk.options.PipelineOptionsFactory.as(PipelineOptionsFactory.java:130)        at com.walmart.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:125)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)        ... 6 more


      From: amir bahmanyari <am...@yahoo.com>
 To: Raghu Angadi <ra...@google.com> 
Cc: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
 Sent: Sunday, May 8, 2016 7:48 PM
 Subject: Re: How to set groupId in KafkaIO.Read()
   
Yes , I am wondering too. It happens when executing p.run().What checkpoints can i do?Following is the complete stacktrace, FYI.Thanks very much for your attention Raghu.
 java.lang.RuntimeException: Pipeline execution failed        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:119)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:51)        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at com.walmart.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:321)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1184)        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1234)Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2acc4c7e104ea9695a09eda8bc7a6d36 (readfromkafka2-abahman-0509024419)        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:146)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:116)        ... 14 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2acc4c7e104ea9695a09eda8bc7a6d36 (readfromkafka2-abahman-0509024419)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1223)        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:469)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:113)        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at akka.actor.ActorCell.invoke(ActorCell.scala:487)        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)        at akka.dispatch.Mailbox.run(Mailbox.scala:221)        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not create input splits from Source.        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:701)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1147)        ... 23 moreCaused by: java.io.IOException: Could not create input splits from Source.        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:109)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:41)        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


      From: Raghu Angadi <ra...@google.com>
 To: amir bahmanyari <am...@yahoo.com> 
Cc: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
 Sent: Sunday, May 8, 2016 6:35 PM
 Subject: Re: How to set groupId in KafkaIO.Read()
  
The main issue is why are seeing this exception in the first place. You don't have to provide a group id. 

On Sunday, May 8, 2016, Raghu Angadi <ra...@google.com> wrote:

Please see javadic for the method. You can create a map anyway you like. Do you need help creating a map in Java? 

On Sunday, May 8, 2016, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,Am using Kafka 0.9.0. Added updateConsumerProperites(ImmutableMap.of("group.id", "myGroup").I get ImmutableMap not resolved 
tried many ways to put the right jar to resolve it. Didnt cut it.
Thanks.      
What version of Kafka are you using? KafkaIO does not need you to set a group.id. Not sure why you are seeing this exception.
Though KafkaIO does not need it, you can set it:  KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id", "temp"));


On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry missed the subject! :)

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Saturday, May 7, 2016 9:31 PM
 Subject: 
  
Hi colleagues,Hope you are having a great weekend.I get a Kafka configuration exception, GroupId not being set,  when trying to run my Beam app (Flink runner) in a Flink cluster.

I couldn't find a reference to a method that seta a GroupId  similar to what we do for withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.The bottom of the stack trace is provide below.How can I set a GroupId property for KafkaIO.read()?Thanks for your help.
        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


   



   



   

  

Re: How to set groupId in KafkaIO.Read()

Posted by amir bahmanyari <am...@yahoo.com>.
Yes , I am wondering too. It happens when executing p.run().What checkpoints can i do?Following is the complete stacktrace, FYI.Thanks very much for your attention Raghu.
 java.lang.RuntimeException: Pipeline execution failed        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:119)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:51)        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at com.walmart.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:321)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:498)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1184)        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1234)Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job 2acc4c7e104ea9695a09eda8bc7a6d36 (readfromkafka2-abahman-0509024419)        at org.apache.flink.client.program.Client.runBlocking(Client.java:381)        at org.apache.flink.client.program.Client.runBlocking(Client.java:355)        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:146)        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:116)        ... 14 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job 2acc4c7e104ea9695a09eda8bc7a6d36 (readfromkafka2-abahman-0509024419)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1223)        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:469)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:113)        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at akka.actor.ActorCell.invoke(ActorCell.scala:487)        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)        at akka.dispatch.Mailbox.run(Mailbox.scala:221)        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not create input splits from Source.        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:701)        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1147)        ... 23 moreCaused by: java.io.IOException: Could not create input splits from Source.        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:109)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:41)        at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


      From: Raghu Angadi <ra...@google.com>
 To: amir bahmanyari <am...@yahoo.com> 
Cc: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
 Sent: Sunday, May 8, 2016 6:35 PM
 Subject: Re: How to set groupId in KafkaIO.Read()
   
The main issue is why are seeing this exception in the first place. You don't have to provide a group id. 

On Sunday, May 8, 2016, Raghu Angadi <ra...@google.com> wrote:

Please see javadic for the method. You can create a map anyway you like. Do you need help creating a map in Java? 

On Sunday, May 8, 2016, amir bahmanyari <am...@yahoo.com> wrote:

Hi Raghu,Am using Kafka 0.9.0. Added updateConsumerProperites(ImmutableMap.of("group.id", "myGroup").I get ImmutableMap not resolved 
tried many ways to put the right jar to resolve it. Didnt cut it.
Thanks.      
What version of Kafka are you using? KafkaIO does not need you to set a group.id. Not sure why you are seeing this exception.
Though KafkaIO does not need it, you can set it:  KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id", "temp"));


On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry missed the subject! :)

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Saturday, May 7, 2016 9:31 PM
 Subject: 
  
Hi colleagues,Hope you are having a great weekend.I get a Kafka configuration exception, GroupId not being set,  when trying to run my Beam app (Flink runner) in a Flink cluster.

I couldn't find a reference to a method that seta a GroupId  similar to what we do for withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.The bottom of the stack trace is provide below.How can I set a GroupId property for KafkaIO.read()?Thanks for your help.
        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


   



   



  

Re: How to set groupId in KafkaIO.Read()

Posted by Raghu Angadi <ra...@google.com>.
The main issue is why are seeing this exception in the first place. You
don't have to provide a group id.

On Sunday, May 8, 2016, Raghu Angadi <ra...@google.com> wrote:

> Please see javadic for the method. You can create a map anyway you like.
> Do you need help creating a map in Java?
>
> On Sunday, May 8, 2016, amir bahmanyari <amirtousa@yahoo.com
> <javascript:_e(%7B%7D,'cvml','amirtousa@yahoo.com');>> wrote:
>
>> Hi Raghu,
>> Am using Kafka 0.9.0. Added updateConsumerProperites(*ImmutableMap*.of("
>> group.id", "myGroup").
>> I get *ImmutableMap *not resolved
>>
>> tried many ways to put the right jar to resolve it. Didnt cut it.
>>
>> Thanks.
>> ------------------------------
>>
>> What version of Kafka are you using? KafkaIO does not need you to set a
>> group.id. Not sure why you are seeing this exception.
>>
>> Though KafkaIO does not need it, you can set it:
>>   KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id",
>> "temp"));
>>
>>
>>
>> On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <am...@yahoo.com>
>> wrote:
>>
>> Sorry missed the subject! :)
>>
>>
>> ------------------------------
>> *From:* amir bahmanyari <am...@yahoo.com>
>> *To:* "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
>> *Sent:* Saturday, May 7, 2016 9:31 PM
>> *Subject:*
>>
>> Hi colleagues,
>> Hope you are having a great weekend.
>> I get a Kafka configuration exception, GroupId not being set,  when
>> trying to run my Beam app (Flink runner) in a Flink cluster.
>>
>>
>> I couldn't find a reference to a method that seta a GroupId  similar to
>> what we do for
>> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.
>> The bottom of the stack trace is provide below.
>> How can I set a GroupId property for KafkaIO.read()?
>> Thanks for your help.
>>
>>         ... 25 more
>> Caused by:* org.apache.kafka.common.config.ConfigException: Missing
>> required configuration "group.id <http://group.id/>" which has no default
>> value.*
>>         at
>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
>>         at
>> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
>>         at org.apache.kafka.clients.consumer.*ConsumerConfig*
>> .<init>(ConsumerConfig.java:194)
>>         at org.apache.kafka.clients.consumer.*KafkaConsumer*
>> .<init>(KafkaConsumer.java:380)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
>>         at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
>>         at org.apache.beam.sdk.io.*kafka.KafkaIO*
>> $Read$1.apply(KafkaIO.java:339)
>>         at
>> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>>         at
>> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>>         at
>> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
>>         at
>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
>>         ... 27 more
>>
>>
>>
>>
>>
>>
>>

Re: How to set groupId in KafkaIO.Read()

Posted by Raghu Angadi <ra...@google.com>.
Please see javadic for the method. You can create a map anyway you like. Do
you need help creating a map in Java?

On Sunday, May 8, 2016, amir bahmanyari <am...@yahoo.com> wrote:

> Hi Raghu,
> Am using Kafka 0.9.0. Added updateConsumerProperites(*ImmutableMap*.of("
> group.id", "myGroup").
> I get *ImmutableMap *not resolved
>
> tried many ways to put the right jar to resolve it. Didnt cut it.
>
> Thanks.
> ------------------------------
>
> What version of Kafka are you using? KafkaIO does not need you to set a
> group.id. Not sure why you are seeing this exception.
>
> Though KafkaIO does not need it, you can set it:
>   KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id",
> "temp"));
>
>
>
> On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <amirtousa@yahoo.com
> <javascript:_e(%7B%7D,'cvml','amirtousa@yahoo.com');>> wrote:
>
> Sorry missed the subject! :)
>
>
> ------------------------------
> *From:* amir bahmanyari <amirtousa@yahoo.com
> <javascript:_e(%7B%7D,'cvml','amirtousa@yahoo.com');>>
> *To:* "user@beam.incubator.apache.org
> <javascript:_e(%7B%7D,'cvml','user@beam.incubator.apache.org');>" <
> user@beam.incubator.apache.org
> <javascript:_e(%7B%7D,'cvml','user@beam.incubator.apache.org');>>
> *Sent:* Saturday, May 7, 2016 9:31 PM
> *Subject:*
>
> Hi colleagues,
> Hope you are having a great weekend.
> I get a Kafka configuration exception, GroupId not being set,  when trying
> to run my Beam app (Flink runner) in a Flink cluster.
>
>
> I couldn't find a reference to a method that seta a GroupId  similar to
> what we do for
> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.
> The bottom of the stack trace is provide below.
> How can I set a GroupId property for KafkaIO.read()?
> Thanks for your help.
>
>         ... 25 more
> Caused by:* org.apache.kafka.common.config.ConfigException: Missing
> required configuration "group.id <http://group.id/>" which has no default
> value.*
>         at
> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
>         at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
>         at org.apache.kafka.clients.consumer.*ConsumerConfig*
> .<init>(ConsumerConfig.java:194)
>         at org.apache.kafka.clients.consumer.*KafkaConsumer*
> .<init>(KafkaConsumer.java:380)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
>         at org.apache.beam.sdk.io.*kafka.KafkaIO*
> $Read$1.apply(KafkaIO.java:339)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>         at
> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
>         ... 27 more
>
>
>
>
>
>
>

Re: How to set groupId in KafkaIO.Read()

Posted by amir bahmanyari <am...@yahoo.com>.
Hi Raghu,Am using Kafka 0.9.0. Added updateConsumerProperites(ImmutableMap.of("group.id", "myGroup").I get ImmutableMap not resolved 
tried many ways to put the right jar to resolve it. Didnt cut it.
Thanks.      From: Raghu Angadi <ra...@google.com>
 To: user@beam.incubator.apache.org; amir bahmanyari <am...@yahoo.com> 
 Sent: Saturday, May 7, 2016 9:50 PM
 Subject: Re: How to set groupId in KafkaIO.Read()
   
What version of Kafka are you using? KafkaIO does not need you to set a group.id. Not sure why you are seeing this exception.
Though KafkaIO does not need it, you can set it:  KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id", "temp"));


On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <am...@yahoo.com> wrote:

Sorry missed the subject! :)

      From: amir bahmanyari <am...@yahoo.com>
 To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org> 
 Sent: Saturday, May 7, 2016 9:31 PM
 Subject: 
  
Hi colleagues,Hope you are having a great weekend.I get a Kafka configuration exception, GroupId not being set,  when trying to run my Beam app (Flink runner) in a Flink cluster.

I couldn't find a reference to a method that seta a GroupId  similar to what we do for withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.The bottom of the stack trace is provide below.How can I set a GroupId property for KafkaIO.read()?Thanks for your help.
        ... 25 moreCaused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "group.id" which has no default value.        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)        at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:194)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:380)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:339)        at org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)        at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)        at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)        at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)        ... 27 more


   



  

Re: How to set groupId in KafkaIO.Read()

Posted by Raghu Angadi <ra...@google.com>.
What version of Kafka are you using? KafkaIO does not need you to set a
group.id. Not sure why you are seeing this exception.

Though KafkaIO does not need it, you can set it:
  KafkaIO.read().updateConsumerProperites(ImmutableMap.of("group.id",
"temp"));



On Sat, May 7, 2016 at 9:34 PM, amir bahmanyari <am...@yahoo.com> wrote:

> Sorry missed the subject! :)
>
>
> ------------------------------
> *From:* amir bahmanyari <am...@yahoo.com>
> *To:* "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> *Sent:* Saturday, May 7, 2016 9:31 PM
> *Subject:*
>
> Hi colleagues,
> Hope you are having a great weekend.
> I get a Kafka configuration exception, GroupId not being set,  when trying
> to run my Beam app (Flink runner) in a Flink cluster.
>
>
> I couldn't find a reference to a method that seta a GroupId  similar to
> what we do for
> withBootstrapServers("host:9092").withTopics(topics).withMaxNumRecords etc.
> The bottom of the stack trace is provide below.
> How can I set a GroupId property for KafkaIO.read()?
> Thanks for your help.
>
>         ... 25 more
> Caused by:* org.apache.kafka.common.config.ConfigException: Missing
> required configuration "group.id <http://group.id>" which has no default
> value.*
>         at
> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
>         at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:48)
>         at org.apache.kafka.clients.consumer.*ConsumerConfig*
> .<init>(ConsumerConfig.java:194)
>         at org.apache.kafka.clients.consumer.*KafkaConsumer*
> .<init>(KafkaConsumer.java:380)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:363)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:350)
>         at org.apache.beam.sdk.io.*kafka.KafkaIO*
> $Read$1.apply(KafkaIO.java:339)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$Read$1.apply(KafkaIO.java:337)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:572)
>         at
> org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter.splitIntoBundles(BoundedReadFromUnboundedSource.java:165)
>         at
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:101)
>         ... 27 more
>
>
>
>