You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sohimankotia <so...@gmail.com> on 2017/08/23 05:34:47 UTC

Reset Kafka Consumer using Flink Consumer 10 API

Hi,

I am trying to replay kafka logs from specific offset . But I am not able to
make it work .


Using Ref :
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

My Code : 



import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.util.*;


public class ReplayTest {

	public static void main(String[] args) throws Exception {

		StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);

		final FlinkKafkaConsumer010<String> kafkaSource = getKafkaSource();
		final DataStreamSource<String> in = env.addSource(kafkaSource);

		in.addSink(new PrintSinkFunction<>());
		in.addSink(getKafkaSink());

		env.execute();


	}

	private static FlinkKafkaConsumer010<String> getKafkaSource() {
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "localhost:9092");
		properties.setProperty("zookeeper.connect", "localhost:8081");
		properties.setProperty("group.id", "test11");
		final FlinkKafkaConsumer010<String> consumer = new
FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties);
		HashMap<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
		specificStartOffsets.put(new KafkaTopicPartition("test", 0), 2L);
		consumer.restoreState(specificStartOffsets);
		return consumer;
	}


	private static FlinkKafkaProducer010<String> getKafkaSink() {
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "localhost:9092");
		return new FlinkKafkaProducer010<>("test2", new SimpleStringSchema(),
properties);
	}


}


I am using <flink.version>1.2.1</flink.version> for all flink dependencies .

When I am running code on IDE or local flink set up , I am getting 


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
	at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
	at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
	at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
	at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
	at
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
	at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: java.lang.IllegalStateException: The runtime context has not been
initialized.
	at
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.restoreState(FlinkKafkaConsumerBase.java:388)
	at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.getKafkaSource(ReplayTest.java:50)
	at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.main(ReplayTest.java:27)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
	... 13 more


Thanks and Regards
Sohanvir























--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reset-Kafka-Consumer-using-Flink-Consumer-10-API-tp15077.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Reset Kafka Consumer using Flink Consumer 10 API

Posted by sohimankotia <so...@gmail.com>.
Thanks for Reply Robert .

How do I specify start position of consumer for FlinkKafkaConsumer010?

Because  methods e.g. setStartFromSpecificOffsets  sepecified in  
documentation
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-consumers-start-position-configuration>   
,are not present in  *FlinkKafkaConsumer010*.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reset-Kafka-Consumer-using-Flink-Consumer-10-API-tp15077p15181.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Reset Kafka Consumer using Flink Consumer 10 API

Posted by Robert Metzger <rm...@apache.org>.
Hi,

it seems from the stack trace, that you are calling the restoreState()
method yourself in ReplayTest.getKafkaSource(ReplayTest.java:50):

org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.
restoreState(FlinkKafkaConsumerBase.java:388)
        at
in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.
getKafkaSource(ReplayTest.java:50)


This method is actually an internal method called by the system on the task
managers when restoring state.
You are calling the method on the client when submitting the Flink job.
That's why "The runtime context has not been initialized.".


On Wed, Aug 23, 2017 at 7:34 AM, sohimankotia <so...@gmail.com>
wrote:

> Hi,
>
> I am trying to replay kafka logs from specific offset . But I am not able
> to
> make it work .
>
>
> Using Ref :
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/connectors/kafka.html#kafka-consumers-
> start-position-configuration
>
> My Code :
>
>
>
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
> import
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>
> import java.util.*;
>
>
> public class ReplayTest {
>
>         public static void main(String[] args) throws Exception {
>
>                 StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>                 env.setParallelism(1);
>
>                 final FlinkKafkaConsumer010<String> kafkaSource =
> getKafkaSource();
>                 final DataStreamSource<String> in =
> env.addSource(kafkaSource);
>
>                 in.addSink(new PrintSinkFunction<>());
>                 in.addSink(getKafkaSink());
>
>                 env.execute();
>
>
>         }
>
>         private static FlinkKafkaConsumer010<String> getKafkaSource() {
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers",
> "localhost:9092");
>                 properties.setProperty("zookeeper.connect",
> "localhost:8081");
>                 properties.setProperty("group.id", "test11");
>                 final FlinkKafkaConsumer010<String> consumer = new
> FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties);
>                 HashMap<KafkaTopicPartition, Long> specificStartOffsets =
> new HashMap<>();
>                 specificStartOffsets.put(new KafkaTopicPartition("test",
> 0), 2L);
>                 consumer.restoreState(specificStartOffsets);
>                 return consumer;
>         }
>
>
>         private static FlinkKafkaProducer010<String> getKafkaSink() {
>                 Properties properties = new Properties();
>                 properties.setProperty("bootstrap.servers",
> "localhost:9092");
>                 return new FlinkKafkaProducer010<>("test2", new
> SimpleStringSchema(),
> properties);
>         }
>
>
> }
>
>
> I am using <flink.version>1.2.1</flink.version> for all flink
> dependencies .
>
> When I am running code on IDE or local flink set up , I am getting
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method
> caused an error.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:545)
>         at
> org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:419)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>         at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:831)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>         at org.apache.flink.client.CliFrontend$2.call(
> CliFrontend.java:1120)
>         at org.apache.flink.client.CliFrontend$2.call(
> CliFrontend.java:1117)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(
> HadoopSecurityContext.java:43)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1548)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(
> HadoopSecurityContext.java:40)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: java.lang.IllegalStateException: The runtime context has not
> been
> initialized.
>         at
> org.apache.flink.api.common.functions.AbstractRichFunction.
> getRuntimeContext(AbstractRichFunction.java:53)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.
> restoreState(FlinkKafkaConsumerBase.java:388)
>         at
> in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.
> getKafkaSource(ReplayTest.java:50)
>         at
> in.dailyhunt.cis.enrichment.pings.simulate.ReplayTest.
> main(ReplayTest.java:27)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:528)
>         ... 13 more
>
>
> Thanks and Regards
> Sohanvir
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Reset-Kafka-
> Consumer-using-Flink-Consumer-10-API-tp15077.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>