You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Janardhan Reddy <ja...@olacabs.com> on 2016/08/10 19:46:56 UTC

flink no class found error

Hi,

We are getting the following error on submitting the flink jobs to the
cluster.

1. Caused by: java.lang.NoSuchMethodError:
com.google.common.io.Resources.asCharSource

2. This is for entirely different job
Caused by: java.lang.UnsupportedClassVersionError:
com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0

But when we are running the flink locally, there is no error in both the
jobs.

Re: flink no class found error

Posted by Robert Metzger <rm...@apache.org>.
Hi,
this page explains how to relocate classes in a fat jar:
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html

Regards,
Robert


On Wed, Aug 10, 2016 at 10:31 PM, Janardhan Reddy <
janardhan.reddy@olacabs.com> wrote:

> We don't use guava directly, we use another library which uses guava
> internally? How do we use shade plugin in this case.
>
> On Thu, Aug 11, 2016 at 1:37 AM, Janardhan Reddy <
> janardhan.reddy@olacabs.com> wrote:
>
>> I have cross checked that all our yarn nodes have 1.8 java installed but
>> still we are getting the error : Unsupported major.minor version 52.0
>>
>> On Thu, Aug 11, 2016 at 1:35 AM, Janardhan Reddy <
>> janardhan.reddy@olacabs.com> wrote:
>>
>>> can you please explain a bit more about last option. We are using yarn
>>> so guava might be in some classpath.
>>>
>>> On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Can you check if the jar you are submitting to the cluster contains a
>>>> different Guava than you use at compile time?
>>>>
>>>> Also, it might happen that Guava is in your classpath, for example one
>>>> some YARN setups.
>>>>
>>>> The last resort to resolve these issues is to use the
>>>> maven-shade-plugin and relocated the guava version you need into your own
>>>> namespace.
>>>>
>>>> On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <
>>>> janardhan.reddy@olacabs.com> wrote:
>>>>
>>>>> #1 is thrown from user code.
>>>>>
>>>>> We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0.
>>>>> I think the hadoop's gauva is getting picked up instead of ours
>>>>>
>>>>> On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <rm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Janardhan,
>>>>>>
>>>>>> #1 Is the exception thrown from your user code, or from Flink?
>>>>>>
>>>>>> #2 is most likely caused due to a compiler / runtime version
>>>>>> mismatch: http://stackoverflow.com/questions/10382929/how-to
>>>>>> -fix-java-lang-unsupportedclassversionerror-unsupported-majo
>>>>>> r-minor-versi
>>>>>> You compiled the code with Java8, but you try to run it with an older
>>>>>> JVM.
>>>>>>
>>>>>> On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <
>>>>>> janardhan.reddy@olacabs.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We are getting the following error on submitting the flink jobs to
>>>>>>> the cluster.
>>>>>>>
>>>>>>> 1. Caused by: java.lang.NoSuchMethodError: com.google.common.io
>>>>>>> .Resources.asCharSource
>>>>>>>
>>>>>>> 2. This is for entirely different job
>>>>>>> Caused by: java.lang.UnsupportedClassVersionError:
>>>>>>> com/olacabs/fabric/common/Metadata : Unsupported major.minor
>>>>>>> version 52.0
>>>>>>>
>>>>>>> But when we are running the flink locally, there is no error in both
>>>>>>> the jobs.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: flink no class found error

Posted by Janardhan Reddy <ja...@olacabs.com>.
We don't use guava directly, we use another library which uses guava
internally? How do we use shade plugin in this case.

On Thu, Aug 11, 2016 at 1:37 AM, Janardhan Reddy <
janardhan.reddy@olacabs.com> wrote:

> I have cross checked that all our yarn nodes have 1.8 java installed but
> still we are getting the error : Unsupported major.minor version 52.0
>
> On Thu, Aug 11, 2016 at 1:35 AM, Janardhan Reddy <
> janardhan.reddy@olacabs.com> wrote:
>
>> can you please explain a bit more about last option. We are using yarn so
>> guava might be in some classpath.
>>
>> On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Can you check if the jar you are submitting to the cluster contains a
>>> different Guava than you use at compile time?
>>>
>>> Also, it might happen that Guava is in your classpath, for example one
>>> some YARN setups.
>>>
>>> The last resort to resolve these issues is to use the maven-shade-plugin
>>> and relocated the guava version you need into your own namespace.
>>>
>>> On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <
>>> janardhan.reddy@olacabs.com> wrote:
>>>
>>>> #1 is thrown from user code.
>>>>
>>>> We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0.
>>>> I think the hadoop's gauva is getting picked up instead of ours
>>>>
>>>> On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <rm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Janardhan,
>>>>>
>>>>> #1 Is the exception thrown from your user code, or from Flink?
>>>>>
>>>>> #2 is most likely caused due to a compiler / runtime version mismatch:
>>>>> http://stackoverflow.com/questions/10382929/how-to
>>>>> -fix-java-lang-unsupportedclassversionerror-unsupported-majo
>>>>> r-minor-versi
>>>>> You compiled the code with Java8, but you try to run it with an older
>>>>> JVM.
>>>>>
>>>>> On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <
>>>>> janardhan.reddy@olacabs.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We are getting the following error on submitting the flink jobs to
>>>>>> the cluster.
>>>>>>
>>>>>> 1. Caused by: java.lang.NoSuchMethodError: com.google.common.io
>>>>>> .Resources.asCharSource
>>>>>>
>>>>>> 2. This is for entirely different job
>>>>>> Caused by: java.lang.UnsupportedClassVersionError:
>>>>>> com/olacabs/fabric/common/Metadata : Unsupported major.minor version
>>>>>> 52.0
>>>>>>
>>>>>> But when we are running the flink locally, there is no error in both
>>>>>> the jobs.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: flink no class found error

Posted by Janardhan Reddy <ja...@olacabs.com>.
I have cross checked that all our yarn nodes have 1.8 java installed but
still we are getting the error : Unsupported major.minor version 52.0

On Thu, Aug 11, 2016 at 1:35 AM, Janardhan Reddy <
janardhan.reddy@olacabs.com> wrote:

> can you please explain a bit more about last option. We are using yarn so
> guava might be in some classpath.
>
> On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Can you check if the jar you are submitting to the cluster contains a
>> different Guava than you use at compile time?
>>
>> Also, it might happen that Guava is in your classpath, for example one
>> some YARN setups.
>>
>> The last resort to resolve these issues is to use the maven-shade-plugin
>> and relocated the guava version you need into your own namespace.
>>
>> On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <
>> janardhan.reddy@olacabs.com> wrote:
>>
>>> #1 is thrown from user code.
>>>
>>> We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I
>>> think the hadoop's gauva is getting picked up instead of ours
>>>
>>> On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Hi Janardhan,
>>>>
>>>> #1 Is the exception thrown from your user code, or from Flink?
>>>>
>>>> #2 is most likely caused due to a compiler / runtime version mismatch:
>>>> http://stackoverflow.com/questions/10382929/how-to
>>>> -fix-java-lang-unsupportedclassversionerror-unsupported-majo
>>>> r-minor-versi
>>>> You compiled the code with Java8, but you try to run it with an older
>>>> JVM.
>>>>
>>>> On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <
>>>> janardhan.reddy@olacabs.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are getting the following error on submitting the flink jobs to the
>>>>> cluster.
>>>>>
>>>>> 1. Caused by: java.lang.NoSuchMethodError: com.google.common.io
>>>>> .Resources.asCharSource
>>>>>
>>>>> 2. This is for entirely different job
>>>>> Caused by: java.lang.UnsupportedClassVersionError:
>>>>> com/olacabs/fabric/common/Metadata : Unsupported major.minor version
>>>>> 52.0
>>>>>
>>>>> But when we are running the flink locally, there is no error in both
>>>>> the jobs.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: flink - Working with State example

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you mean the directory is completely empty? Can you check in the JobManager
dashboard whether it reports any successful checkpoints for the job? One
possible explanation is an optimization that the FsStateBackend performs:
when the state is very small it will not actually be written to files but
stored in the meta data of the checkpoint that is sent to the JobManager.
This would explain why there are no files. You can set the threshold size
for this optimization with an additional FsStateBackend constructor
parameter, i.e. new FsStateBackend("file:///home/buvana/flink/checkpoints",
0) to disable this optimization.

Cheers,
Aljoscha

On Fri, 12 Aug 2016 at 21:12 Ramanan, Buvana (Nokia - US) <
buvana.ramanan@nokia-bell-labs.com> wrote:

> Hi Kostas,
>
> I am trying to use FsStateBackend as the backend for storing state. And
> configure it as follows in the code:
>
>        StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStateBackend(new
> FsStateBackend("file:///home/buvana/flink/checkpoints"));
>         env.enableCheckpointing(10000);
>
> everything else is same as the code I shared with you previously.
>
> When I execute, I see that a directory is created under
> /home/buvana/flink/checkpoints, but there is nothing under that directory.
> I was expecting to find some file / sub dir there.
>
> Please explain.
>
> Thanks,
> Buvana
>
> -----Original Message-----
> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
> Sent: Friday, August 12, 2016 1:37 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
>
> No problem!
>
> Regards,
> Kostas
>
> > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
> >
> > Kostas,
> > Good catch! That makes it working! Thank you so much for the help.
> > Regards,
> > Buvana
> >
> > -----Original Message-----
> > From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
> > Sent: Thursday, August 11, 2016 11:22 AM
> > To: user@flink.apache.org
> > Subject: Re: flink - Working with State example
> >
> > Hi Buvana,
> >
> > At a first glance, your snapshotState() should return a Double.
> >
> > Kostas
> >
> >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <
> buvana.ramanan@nokia-bell-labs.com> wrote:
> >>
> >> Thank you Kostas & Ufuk. I get into the following compilation error
> when I use checkpointed interface. Pasting the code & message as follows:
> >>
> >> Is the Serializable definition supposed to be from java.io.Serializable
> or somewhere else?
> >>
> >> Thanks again,
> >> Buvana
> >>
> >> ======================================================================
> >> ==========================================
> >> Code:
> >>
> >> import org.apache.flink.api.common.functions.FlatMapFunction;
> >> import org.apache.flink.api.common.functions.MapFunction;
> >> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> >> import org.apache.flink.configuration.Configuration;
> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> import org.apache.flink.api.common.typeinfo.TypeHint;
> >> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> >>
> >> import java.io.Serializable;
> >> import org.apache.flink.api.java.tuple.Tuple2;
> >> import org.apache.flink.streaming.api.datastream.DataStream;
> >> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >> import
> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >> import
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> >> import
> >> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> >> import org.apache.flink.util.Collector;
> >>
> >> import java.util.Properties;
> >>
> >> /**
> >> * Created by buvana on 8/9/16.
> >> */
> >> public class stateful {
> >>   private static String INPUT_KAFKA_TOPIC = null;
> >> ---
> >> --- skipping the main as it’s the same as before except for class name
> >> change -------------
> >> ---
> >>      public static class MapStateful extends
> RichFlatMapFunction<String, Tuple2<String, Double>>
> >>           implements Checkpointed<Double> {
> >>
> >>       private Double prev_tuple = null;
> >>
> >>       @Override
> >>       public void flatMap(String incString, Collector<Tuple2<String,
> Double>> out) {
> >>           try {
> >>               Double value = Double.parseDouble(incString);
> >>               System.out.println("value = " + value);
> >>               System.out.println(prev_tuple);
> >>
> >>               Double value2 = value - prev_tuple;
> >>               prev_tuple = value;
> >>
> >>               Tuple2<String, Double> tp = new Tuple2<String, Double>();
> >>               tp.setField(INPUT_KAFKA_TOPIC, 0);
> >>               tp.setField(value2, 1);
> >>               out.collect(tp);
> >>           } catch (NumberFormatException e) {
> >>               System.out.println("Could not convert to Float" +
> incString);
> >>               System.err.println("Could not convert to Float" +
> incString);
> >>           }
> >>       }
> >>       @Override
> >>       public void open(Configuration config) {
> >>           if (prev_tuple == null) {
> >>               // only recreate if null
> >>               // restoreState will be called before open()
> >>               // so this will already set the sum to the restored value
> >>               prev_tuple = new Double("0.0");
> >>           }
> >>       }
> >>
> >>       @Override
> >>       public Serializable snapshotState(
> >>               long checkpointId,
> >>               long checkpointTimestamp) throws Exception {
> >>           return prev_tuple;
> >>       }
> >>
> >>
> >>       @Override
> >>       public void restoreState(Double state) {
> >>           prev_tuple = state;
> >>       }
> >>   }
> >> }
> >> ======================================================================
> >> =========================================
> >> ERROR message while building:
> >>
> >> $ mvn clean package
> >> [INFO] Scanning for projects...
> >> [INFO]
> >> [INFO]
> >> ----------------------------------------------------------------------
> >> -- [INFO] Building Flink Quickstart Job 0.1 [INFO]
> >> ----------------------------------------------------------------------
> >> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has
> >> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] ---
> >> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO]
> >> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> >> [INFO]
> >> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @
> >> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
> >> [INFO] Copying 1 resource
> >> [INFO]
> >> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @
> >> wiki-edits --- [INFO] Changes detected - recompiling the module!
> >> [INFO] Compiling 7 source files to
> >> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> >> [INFO] -------------------------------------------------------------
> >> [ERROR] COMPILATION ERROR :
> >> [INFO] -------------------------------------------------------------
> >> [ERROR]
> >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> >> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
> >> does not override abstract method snapshotState(long,long) in
> >> org.apache.flink.streaming.api.checkpoint.Checkpointed
> >> [ERROR]
> >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> >> eful.java:[151,29] snapshotState(long,long) in
> >> wikiedits.stateful.MapStateful cannot implement
> >> snapshotState(long,long) in
> >> org.apache.flink.streaming.api.checkpoint.Checkpointed
> >> return type java.io.Serializable is not compatible with
> >> java.lang.Double [ERROR]
> >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> >> eful.java:[150,9] method does not override or implement a method from
> >> a supertype [INFO] 3 errors [INFO]
> >> -------------------------------------------------------------
> >> [INFO]
> >> ----------------------------------------------------------------------
> >> --
> >> [INFO] BUILD FAILURE
> >> [INFO]
> >> ----------------------------------------------------------------------
> >> --
> >> [INFO] Total time: 2.171s
> >> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory:
> >> 26M/660M [INFO]
> >> ----------------------------------------------------------------------
> >> -- [ERROR] Failed to execute goal
> >> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile
> (default-compile) on project wiki-edits: Compilation failure: Compilation
> failure:
> >> [ERROR]
> >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> >> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and
> >> does not override abstract method snapshotState(long,long) in
> >> org.apache.flink.streaming.api.checkpoint.Checkpointed
> >> [ERROR]
> >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> >> eful.java:[151,29] snapshotState(long,long) in
> >> wikiedits.stateful.MapStateful cannot implement
> >> snapshotState(long,long) in
> >> org.apache.flink.streaming.api.checkpoint.Checkpointed
> >> [ERROR] return type java.io.Serializable is not compatible with
> >> java.lang.Double [ERROR]
> >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> >> eful.java:[150,9] method does not override or implement a method from
> >> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack
> trace of the errors, re-run Maven with the -e switch.
> >> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> >> [ERROR]
> >> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> >> [ERROR] [Help 1]
> >> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> >> ======================================================================
> >> ==========================================
> >>
> >> -----Original Message-----
> >> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
> >> Sent: Thursday, August 11, 2016 10:34 AM
> >> To: user@flink.apache.org
> >> Subject: Re: flink - Working with State example
> >>
> >> Exactly as Ufuk suggested, if you are not grouping your stream by key,
> you should use the checkpointed interface.
> >>
> >> The reason I asked before if you are using the keyBy() is because this
> is the one that implicitly sets the keySerializer and scopes your (keyed)
> state to a specific key.
> >>
> >> If there is no keying, then keyed state cannot be used and the
> Checkpointed interface should be used instead.
> >>
> >> Let us know if you need anything else.
> >>
> >> Kostas
> >>
> >>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <uc...@apache.org> wrote:
> >>>
> >>> This only works for keyed streams, you have to use keyBy().
> >>>
> >>> You can use the Checkpointed interface instead
> >>> (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields
> ).
> >>>
> >>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
> >>> <bu...@nokia-bell-labs.com> wrote:
> >>>> Hi Kostas,
> >>>>
> >>>>
> >>>>
> >>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]),
> >>>> where x[t] is the current value of the incoming sample and x[t-1] is
> >>>> the previous value of the incoming sample. I store the current value
> >>>> in state store
> >>>> (‘prev_tuple’) so that I can use it for computation in next cycle.
> >>>> As you may observe, I am not using keyBy. I am simply printing out
> >>>> the resultant tuple.
> >>>>
> >>>>
> >>>>
> >>>> It appears from the error message that I have to set the key
> >>>> serializer (and possibly value serializer) for the state store. I am
> >>>> not sure how to do that…
> >>>>
> >>>>
> >>>>
> >>>> Thanks for your interest in helping,
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Regards,
> >>>>
> >>>> Buvana
> >>>>
> >>>>
> >>>>
> >>>> public class stateful {
> >>>>
> >>>>  private static String INPUT_KAFKA_TOPIC = null;
> >>>>
> >>>>  private static int TIME_WINDOW = 0;
> >>>>
> >>>>
> >>>>
> >>>>  public static void main(String[] args) throws Exception {
> >>>>
> >>>>
> >>>>
> >>>>      if (args.length < 2) {
> >>>>
> >>>>          throw new IllegalArgumentException("The application needs
> >>>> two arguments. The first is the name of the kafka topic from which it
> has to \n"
> >>>>
> >>>>                  + "fetch the data. The second argument is the size
> >>>> of the window, in seconds, to which the aggregation function must be
> applied.
> >>>> \n");
> >>>>
> >>>>      }
> >>>>
> >>>>
> >>>>
> >>>>      INPUT_KAFKA_TOPIC = args[0];
> >>>>
> >>>>      TIME_WINDOW = Integer.parseInt(args[1]);
> >>>>
> >>>>
> >>>>
> >>>>      Properties properties = null;
> >>>>
> >>>>
> >>>>
> >>>>      properties = new Properties();
> >>>>
> >>>>      properties.setProperty("bootstrap.servers", "localhost:9092");
> >>>>
> >>>>      properties.setProperty("zookeeper.connect", "localhost:2181");
> >>>>
> >>>>      properties.setProperty("group.id", "test");
> >>>>
> >>>>
> >>>>
> >>>>      StreamExecutionEnvironment env =
> >>>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>>>
> >>>>      //env.setStateBackend(new
> >>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
> >>>>
> >>>>
> >>>>
> >>>>      DataStreamSource<String> stream = env
> >>>>
> >>>>              .addSource(new
> >>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(),
> >>>> properties));
> >>>>
> >>>>
> >>>>
> >>>>      // maps the data into Flink tuples
> >>>>
> >>>>      DataStream<Tuple2<String,Double>> streamTuples =
> >>>> stream.flatMap(new Rec2Tuple2());
> >>>>
> >>>>
> >>>>
> >>>>      // write the result to the console or in a Kafka topic
> >>>>
> >>>>      streamTuples.print();
> >>>>
> >>>>
> >>>>
> >>>>      env.execute("plus one");
> >>>>
> >>>>
> >>>>
> >>>>  }
> >>>>
> >>>>
> >>>>
> >>>>  public static class Rec2Tuple2 extends RichFlatMapFunction<String,
> >>>> Tuple2<String,Double> > {
> >>>>
> >>>>      private transient ValueState<Tuple2<String, Double>>
> >>>> prev_tuple;
> >>>>
> >>>>
> >>>>
> >>>>      @Override
> >>>>
> >>>>      public void flatMap(String incString, Collector<Tuple2<String,
> >>>> Double>> out) throws Exception {
> >>>>
> >>>>          try {
> >>>>
> >>>>              Double value = Double.parseDouble(incString);
> >>>>
> >>>>              System.out.println("value = " + value);
> >>>>
> >>>>              Tuple2<String, Double> prev_stored_tp =
> >>>> prev_tuple.value();
> >>>>
> >>>>              System.out.println(prev_stored_tp);
> >>>>
> >>>>
> >>>>
> >>>>              Double value2 = value - prev_stored_tp.f1;
> >>>>
> >>>>              prev_stored_tp.f1 = value;
> >>>>
> >>>>              prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
> >>>>
> >>>>              prev_tuple.update(prev_stored_tp);
> >>>>
> >>>>
> >>>>
> >>>>              Tuple2<String, Double> tp = new Tuple2<String,
> >>>> Double>();
> >>>>
> >>>>              tp.setField(INPUT_KAFKA_TOPIC, 0);
> >>>>
> >>>>              tp.setField(value2, 1);
> >>>>
> >>>>              out.collect(tp);
> >>>>
> >>>>
> >>>>
> >>>>          } catch (NumberFormatException e) {
> >>>>
> >>>>              System.out.println("Could not convert to Float" +
> >>>> incString);
> >>>>
> >>>>              System.err.println("Could not convert to Float" +
> >>>> incString);
> >>>>
> >>>>          }
> >>>>
> >>>>      }
> >>>>
> >>>>
> >>>>
> >>>>      @Override
> >>>>
> >>>>      public void open(Configuration config) {
> >>>>
> >>>>          ValueStateDescriptor<Tuple2<String, Double>> descriptor =
> >>>>
> >>>>                  new ValueStateDescriptor<>(
> >>>>
> >>>>                          "previous input value", // the state name
> >>>>
> >>>>                          TypeInformation.of(new
> >>>> TypeHint<Tuple2<String,
> >>>> Double>>() {}), // type information
> >>>>
> >>>>                          Tuple2.of("test topic", 0.0)); // default
> >>>> value of the state, if nothing was set
> >>>>
> >>>>          prev_tuple = getRuntimeContext().getState(descriptor);
> >>>>
> >>>>      }
> >>>>
> >>>>  }
> >>>>
> >>>> }
> >>>>
> >>>>
> >>>>
> >>>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
> >>>> Sent: Thursday, August 11, 2016 5:45 AM
> >>>> To: user@flink.apache.org
> >>>> Subject: Re: flink - Working with State example
> >>>>
> >>>>
> >>>>
> >>>> Hello Buvana,
> >>>>
> >>>>
> >>>>
> >>>> Can you share a bit more details on your operator and how you are
> using it?
> >>>>
> >>>> For example, are you using keyBy before using you custom operator?
> >>>>
> >>>>
> >>>>
> >>>> Thanks a lot,
> >>>>
> >>>> Kostas
> >>>>
> >>>>
> >>>>
> >>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
> >>>> <bu...@nokia-bell-labs.com> wrote:
> >>>>
> >>>>
> >>>>
> >>>> Hello,
> >>>>
> >>>>
> >>>>
> >>>> I am utilizing the code snippet in:
> >>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
> >>>> g /state.html and particularly ‘open’ function in my code:
> >>>>
> >>>> @Override
> >>>>
> >>>>  public void open(Configuration config) {
> >>>>
> >>>>      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
> >>>>
> >>>>              new ValueStateDescriptor<>(
> >>>>
> >>>>                      "average", // the state name
> >>>>
> >>>>                      TypeInformation.of(new TypeHint<Tuple2<Long,
> >>>> Long>>() {}), // type information
> >>>>
> >>>>                      Tuple2.of(0L, 0L)); // default value of the
> >>>> state, if nothing was set
> >>>>
> >>>>      sum = getRuntimeContext().getState(descriptor);
> >>>>
> >>>>  }
> >>>>
> >>>>
> >>>>
> >>>> When I run, I get the following error:
> >>>>
> >>>> Caused by: java.lang.RuntimeException: Error while getting state
> >>>>
> >>>>             at
> >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
> >>>> S
> >>>> tate(StreamingRuntimeContext.java:120)
> >>>>
> >>>>             at
> >>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
> >>>>
> >>>>             at
> >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
> >>>> n
> >>>> (FunctionUtils.java:38)
> >>>>
> >>>>             at
> >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
> >>>> p
> >>>> en(AbstractUdfStreamOperator.java:91)
> >>>>
> >>>>             at
> >>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
> >>>> a
> >>>> tMap.java:41)
> >>>>
> >>>>             at
> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
> >>>> (
> >>>> StreamTask.java:314)
> >>>>
> >>>>             at
> >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
> >>>> k
> >>>> .java:214)
> >>>>
> >>>>             at
> >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>>>
> >>>>             at java.lang.Thread.run(Thread.java:745)
> >>>>
> >>>> Caused by: java.lang.Exception: State key serializer has not been
> >>>> configured in the config. This operation cannot use partitioned state.
> >>>>
> >>>>             at
> >>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
> >>>> a
> >>>> te(AbstractStateBackend.java:199)
> >>>>
> >>>>             at
> >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
> >>>> a
> >>>> rtitionedState(AbstractStreamOperator.java:260)
> >>>>
> >>>>             at
> >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
> >>>> S
> >>>> tate(StreamingRuntimeContext.java:118)
> >>>>
> >>>>             ... 8 more
> >>>>
> >>>>
> >>>>
> >>>> Where do I define the key & value serializer for state?
> >>>>
> >>>>
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Buvana
> >>>>
> >>>>
> >>
> >
>
>

RE: flink - Working with State example

Posted by "Ramanan, Buvana (Nokia - US)" <bu...@nokia-bell-labs.com>.
Hi Kostas,

I am trying to use FsStateBackend as the backend for storing state. And configure it as follows in the code:

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(new FsStateBackend("file:///home/buvana/flink/checkpoints"));
        env.enableCheckpointing(10000);

everything else is same as the code I shared with you previously. 

When I execute, I see that a directory is created under /home/buvana/flink/checkpoints, but there is nothing under that directory.
I was expecting to find some file / sub dir there.

Please explain.

Thanks,
Buvana

-----Original Message-----
From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com] 
Sent: Friday, August 12, 2016 1:37 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com> wrote:
> 
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
> 
> -----Original Message-----
> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com] 
> Sent: Thursday, August 11, 2016 11:22 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Hi Buvana, 
> 
> At a first glance, your snapshotState() should return a Double.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com> wrote:
>> 
>> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>> 
>> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>> 
>> Thanks again,
>> Buvana
>> 
>> ======================================================================
>> ==========================================
>> Code:
>> 
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> 
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import 
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.util.Properties;
>> 
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name 
>> change -------------
>> ---
>> 	public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>>           implements Checkpointed<Double> {
>> 
>>       private Double prev_tuple = null;
>> 
>>       @Override
>>       public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>>           try {
>>               Double value = Double.parseDouble(incString);
>>               System.out.println("value = " + value);
>>               System.out.println(prev_tuple);
>> 
>>               Double value2 = value - prev_tuple;
>>               prev_tuple = value;
>> 
>>               Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>               tp.setField(value2, 1);
>>               out.collect(tp);
>>           } catch (NumberFormatException e) {
>>               System.out.println("Could not convert to Float" + incString);
>>               System.err.println("Could not convert to Float" + incString);
>>           }
>>       }
>>       @Override
>>       public void open(Configuration config) {
>>           if (prev_tuple == null) {
>>               // only recreate if null
>>               // restoreState will be called before open()
>>               // so this will already set the sum to the restored value
>>               prev_tuple = new Double("0.0");
>>           }
>>       }
>> 
>>       @Override
>>       public Serializable snapshotState(
>>               long checkpointId,
>>               long checkpointTimestamp) throws Exception {
>>           return prev_tuple;
>>       }
>> 
>> 
>>       @Override
>>       public void restoreState(Double state) {
>>           prev_tuple = state;
>>       }
>>   }
>> }
>> ======================================================================
>> =========================================
>> ERROR message while building:
>> 
>> $ mvn clean package
>> [INFO] Scanning for projects...
>> [INFO]                                                                         
>> [INFO] 
>> ----------------------------------------------------------------------
>> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] 
>> ----------------------------------------------------------------------
>> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has 
>> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- 
>> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] 
>> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
>> [INFO]
>> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
>> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
>> [INFO] Copying 1 resource
>> [INFO]
>> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
>> wiki-edits --- [INFO] Changes detected - recompiling the module!
>> [INFO] Compiling 7 source files to 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
>> [INFO] -------------------------------------------------------------
>> [ERROR] COMPILATION ERROR : 
>> [INFO] -------------------------------------------------------------
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
>> does not override abstract method snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in 
>> wikiedits.stateful.MapStateful cannot implement 
>> snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> return type java.io.Serializable is not compatible with 
>> java.lang.Double [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from 
>> a supertype [INFO] 3 errors [INFO] 
>> -------------------------------------------------------------
>> [INFO] 
>> ----------------------------------------------------------------------
>> --
>> [INFO] BUILD FAILURE
>> [INFO] 
>> ----------------------------------------------------------------------
>> --
>> [INFO] Total time: 2.171s
>> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: 
>> 26M/660M [INFO] 
>> ----------------------------------------------------------------------
>> -- [ERROR] Failed to execute goal 
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
>> does not override abstract method snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in 
>> wikiedits.stateful.MapStateful cannot implement 
>> snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] return type java.io.Serializable is not compatible with 
>> java.lang.Double [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from 
>> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions, please read the following articles:
>> [ERROR] [Help 1] 
>> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>> ======================================================================
>> ==========================================
>> 
>> -----Original Message-----
>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>> Sent: Thursday, August 11, 2016 10:34 AM
>> To: user@flink.apache.org
>> Subject: Re: flink - Working with State example
>> 
>> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>> 
>> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>> 
>> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. 
>> 
>> Let us know if you need anything else.
>> 
>> Kostas
>> 
>>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>> 
>>> This only works for keyed streams, you have to use keyBy().
>>> 
>>> You can use the Checkpointed interface instead 
>>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>> 
>>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
>>> <bu...@nokia-bell-labs.com> wrote:
>>>> Hi Kostas,
>>>> 
>>>> 
>>>> 
>>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), 
>>>> where x[t] is the current value of the incoming sample and x[t-1] is 
>>>> the previous value of the incoming sample. I store the current value 
>>>> in state store
>>>> (‘prev_tuple’) so that I can use it for computation in next cycle. 
>>>> As you may observe, I am not using keyBy. I am simply printing out 
>>>> the resultant tuple.
>>>> 
>>>> 
>>>> 
>>>> It appears from the error message that I have to set the key 
>>>> serializer (and possibly value serializer) for the state store. I am 
>>>> not sure how to do that…
>>>> 
>>>> 
>>>> 
>>>> Thanks for your interest in helping,
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Regards,
>>>> 
>>>> Buvana
>>>> 
>>>> 
>>>> 
>>>> public class stateful {
>>>> 
>>>>  private static String INPUT_KAFKA_TOPIC = null;
>>>> 
>>>>  private static int TIME_WINDOW = 0;
>>>> 
>>>> 
>>>> 
>>>>  public static void main(String[] args) throws Exception {
>>>> 
>>>> 
>>>> 
>>>>      if (args.length < 2) {
>>>> 
>>>>          throw new IllegalArgumentException("The application needs 
>>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>> 
>>>>                  + "fetch the data. The second argument is the size 
>>>> of the window, in seconds, to which the aggregation function must be applied.
>>>> \n");
>>>> 
>>>>      }
>>>> 
>>>> 
>>>> 
>>>>      INPUT_KAFKA_TOPIC = args[0];
>>>> 
>>>>      TIME_WINDOW = Integer.parseInt(args[1]);
>>>> 
>>>> 
>>>> 
>>>>      Properties properties = null;
>>>> 
>>>> 
>>>> 
>>>>      properties = new Properties();
>>>> 
>>>>      properties.setProperty("bootstrap.servers", "localhost:9092");
>>>> 
>>>>      properties.setProperty("zookeeper.connect", "localhost:2181");
>>>> 
>>>>      properties.setProperty("group.id", "test");
>>>> 
>>>> 
>>>> 
>>>>      StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> 
>>>>      //env.setStateBackend(new
>>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>> 
>>>> 
>>>> 
>>>>      DataStreamSource<String> stream = env
>>>> 
>>>>              .addSource(new
>>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>>>> properties));
>>>> 
>>>> 
>>>> 
>>>>      // maps the data into Flink tuples
>>>> 
>>>>      DataStream<Tuple2<String,Double>> streamTuples = 
>>>> stream.flatMap(new Rec2Tuple2());
>>>> 
>>>> 
>>>> 
>>>>      // write the result to the console or in a Kafka topic
>>>> 
>>>>      streamTuples.print();
>>>> 
>>>> 
>>>> 
>>>>      env.execute("plus one");
>>>> 
>>>> 
>>>> 
>>>>  }
>>>> 
>>>> 
>>>> 
>>>>  public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>>>> Tuple2<String,Double> > {
>>>> 
>>>>      private transient ValueState<Tuple2<String, Double>> 
>>>> prev_tuple;
>>>> 
>>>> 
>>>> 
>>>>      @Override
>>>> 
>>>>      public void flatMap(String incString, Collector<Tuple2<String,
>>>> Double>> out) throws Exception {
>>>> 
>>>>          try {
>>>> 
>>>>              Double value = Double.parseDouble(incString);
>>>> 
>>>>              System.out.println("value = " + value);
>>>> 
>>>>              Tuple2<String, Double> prev_stored_tp = 
>>>> prev_tuple.value();
>>>> 
>>>>              System.out.println(prev_stored_tp);
>>>> 
>>>> 
>>>> 
>>>>              Double value2 = value - prev_stored_tp.f1;
>>>> 
>>>>              prev_stored_tp.f1 = value;
>>>> 
>>>>              prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>> 
>>>>              prev_tuple.update(prev_stored_tp);
>>>> 
>>>> 
>>>> 
>>>>              Tuple2<String, Double> tp = new Tuple2<String,
>>>> Double>();
>>>> 
>>>>              tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>> 
>>>>              tp.setField(value2, 1);
>>>> 
>>>>              out.collect(tp);
>>>> 
>>>> 
>>>> 
>>>>          } catch (NumberFormatException e) {
>>>> 
>>>>              System.out.println("Could not convert to Float" + 
>>>> incString);
>>>> 
>>>>              System.err.println("Could not convert to Float" + 
>>>> incString);
>>>> 
>>>>          }
>>>> 
>>>>      }
>>>> 
>>>> 
>>>> 
>>>>      @Override
>>>> 
>>>>      public void open(Configuration config) {
>>>> 
>>>>          ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>> 
>>>>                  new ValueStateDescriptor<>(
>>>> 
>>>>                          "previous input value", // the state name
>>>> 
>>>>                          TypeInformation.of(new 
>>>> TypeHint<Tuple2<String,
>>>> Double>>() {}), // type information
>>>> 
>>>>                          Tuple2.of("test topic", 0.0)); // default 
>>>> value of the state, if nothing was set
>>>> 
>>>>          prev_tuple = getRuntimeContext().getState(descriptor);
>>>> 
>>>>      }
>>>> 
>>>>  }
>>>> 
>>>> }
>>>> 
>>>> 
>>>> 
>>>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>>>> Sent: Thursday, August 11, 2016 5:45 AM
>>>> To: user@flink.apache.org
>>>> Subject: Re: flink - Working with State example
>>>> 
>>>> 
>>>> 
>>>> Hello Buvana,
>>>> 
>>>> 
>>>> 
>>>> Can you share a bit more details on your operator and how you are using it?
>>>> 
>>>> For example, are you using keyBy before using you custom operator?
>>>> 
>>>> 
>>>> 
>>>> Thanks a lot,
>>>> 
>>>> Kostas
>>>> 
>>>> 
>>>> 
>>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>>>> <bu...@nokia-bell-labs.com> wrote:
>>>> 
>>>> 
>>>> 
>>>> Hello,
>>>> 
>>>> 
>>>> 
>>>> I am utilizing the code snippet in:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>>> g /state.html and particularly ‘open’ function in my code:
>>>> 
>>>> @Override
>>>> 
>>>>  public void open(Configuration config) {
>>>> 
>>>>      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>> 
>>>>              new ValueStateDescriptor<>(
>>>> 
>>>>                      "average", // the state name
>>>> 
>>>>                      TypeInformation.of(new TypeHint<Tuple2<Long,
>>>> Long>>() {}), // type information
>>>> 
>>>>                      Tuple2.of(0L, 0L)); // default value of the 
>>>> state, if nothing was set
>>>> 
>>>>      sum = getRuntimeContext().getState(descriptor);
>>>> 
>>>>  }
>>>> 
>>>> 
>>>> 
>>>> When I run, I get the following error:
>>>> 
>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:120)
>>>> 
>>>>             at
>>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>> 
>>>>             at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>>> n
>>>> (FunctionUtils.java:38)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>>> p
>>>> en(AbstractUdfStreamOperator.java:91)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>>> a
>>>> tMap.java:41)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>>> (
>>>> StreamTask.java:314)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>>> k
>>>> .java:214)
>>>> 
>>>>             at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> 
>>>>             at java.lang.Thread.run(Thread.java:745)
>>>> 
>>>> Caused by: java.lang.Exception: State key serializer has not been 
>>>> configured in the config. This operation cannot use partitioned state.
>>>> 
>>>>             at
>>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>>> a
>>>> te(AbstractStateBackend.java:199)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>>> a
>>>> rtitionedState(AbstractStreamOperator.java:260)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:118)
>>>> 
>>>>             ... 8 more
>>>> 
>>>> 
>>>> 
>>>> Where do I define the key & value serializer for state?
>>>> 
>>>> 
>>>> 
>>>> Thanks,
>>>> 
>>>> Buvana
>>>> 
>>>> 
>> 
> 


Re: flink - Working with State example

Posted by Kostas Kloudas <k....@data-artisans.com>.
No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com> wrote:
> 
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
> 
> -----Original Message-----
> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com] 
> Sent: Thursday, August 11, 2016 11:22 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Hi Buvana, 
> 
> At a first glance, your snapshotState() should return a Double.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com> wrote:
>> 
>> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
>> 
>> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
>> 
>> Thanks again,
>> Buvana
>> 
>> ======================================================================
>> ==========================================
>> Code:
>> 
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> 
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import 
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.util.Properties;
>> 
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name 
>> change -------------
>> ---
>> 	public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>>           implements Checkpointed<Double> {
>> 
>>       private Double prev_tuple = null;
>> 
>>       @Override
>>       public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>>           try {
>>               Double value = Double.parseDouble(incString);
>>               System.out.println("value = " + value);
>>               System.out.println(prev_tuple);
>> 
>>               Double value2 = value - prev_tuple;
>>               prev_tuple = value;
>> 
>>               Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>               tp.setField(value2, 1);
>>               out.collect(tp);
>>           } catch (NumberFormatException e) {
>>               System.out.println("Could not convert to Float" + incString);
>>               System.err.println("Could not convert to Float" + incString);
>>           }
>>       }
>>       @Override
>>       public void open(Configuration config) {
>>           if (prev_tuple == null) {
>>               // only recreate if null
>>               // restoreState will be called before open()
>>               // so this will already set the sum to the restored value
>>               prev_tuple = new Double("0.0");
>>           }
>>       }
>> 
>>       @Override
>>       public Serializable snapshotState(
>>               long checkpointId,
>>               long checkpointTimestamp) throws Exception {
>>           return prev_tuple;
>>       }
>> 
>> 
>>       @Override
>>       public void restoreState(Double state) {
>>           prev_tuple = state;
>>       }
>>   }
>> }
>> ======================================================================
>> =========================================
>> ERROR message while building:
>> 
>> $ mvn clean package
>> [INFO] Scanning for projects...
>> [INFO]                                                                         
>> [INFO] 
>> ----------------------------------------------------------------------
>> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] 
>> ----------------------------------------------------------------------
>> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has 
>> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- 
>> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] 
>> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
>> [INFO]
>> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
>> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
>> [INFO] Copying 1 resource
>> [INFO]
>> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
>> wiki-edits --- [INFO] Changes detected - recompiling the module!
>> [INFO] Compiling 7 source files to 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
>> [INFO] -------------------------------------------------------------
>> [ERROR] COMPILATION ERROR : 
>> [INFO] -------------------------------------------------------------
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
>> does not override abstract method snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in 
>> wikiedits.stateful.MapStateful cannot implement 
>> snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> return type java.io.Serializable is not compatible with 
>> java.lang.Double [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from 
>> a supertype [INFO] 3 errors [INFO] 
>> -------------------------------------------------------------
>> [INFO] 
>> ----------------------------------------------------------------------
>> --
>> [INFO] BUILD FAILURE
>> [INFO] 
>> ----------------------------------------------------------------------
>> --
>> [INFO] Total time: 2.171s
>> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: 
>> 26M/660M [INFO] 
>> ----------------------------------------------------------------------
>> -- [ERROR] Failed to execute goal 
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
>> does not override abstract method snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in 
>> wikiedits.stateful.MapStateful cannot implement 
>> snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] return type java.io.Serializable is not compatible with 
>> java.lang.Double [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from 
>> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions, please read the following articles:
>> [ERROR] [Help 1] 
>> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>> ======================================================================
>> ==========================================
>> 
>> -----Original Message-----
>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>> Sent: Thursday, August 11, 2016 10:34 AM
>> To: user@flink.apache.org
>> Subject: Re: flink - Working with State example
>> 
>> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
>> 
>> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
>> 
>> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. 
>> 
>> Let us know if you need anything else.
>> 
>> Kostas
>> 
>>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>> 
>>> This only works for keyed streams, you have to use keyBy().
>>> 
>>> You can use the Checkpointed interface instead 
>>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>> 
>>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
>>> <bu...@nokia-bell-labs.com> wrote:
>>>> Hi Kostas,
>>>> 
>>>> 
>>>> 
>>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), 
>>>> where x[t] is the current value of the incoming sample and x[t-1] is 
>>>> the previous value of the incoming sample. I store the current value 
>>>> in state store
>>>> (‘prev_tuple’) so that I can use it for computation in next cycle. 
>>>> As you may observe, I am not using keyBy. I am simply printing out 
>>>> the resultant tuple.
>>>> 
>>>> 
>>>> 
>>>> It appears from the error message that I have to set the key 
>>>> serializer (and possibly value serializer) for the state store. I am 
>>>> not sure how to do that…
>>>> 
>>>> 
>>>> 
>>>> Thanks for your interest in helping,
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Regards,
>>>> 
>>>> Buvana
>>>> 
>>>> 
>>>> 
>>>> public class stateful {
>>>> 
>>>>  private static String INPUT_KAFKA_TOPIC = null;
>>>> 
>>>>  private static int TIME_WINDOW = 0;
>>>> 
>>>> 
>>>> 
>>>>  public static void main(String[] args) throws Exception {
>>>> 
>>>> 
>>>> 
>>>>      if (args.length < 2) {
>>>> 
>>>>          throw new IllegalArgumentException("The application needs 
>>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>>> 
>>>>                  + "fetch the data. The second argument is the size 
>>>> of the window, in seconds, to which the aggregation function must be applied.
>>>> \n");
>>>> 
>>>>      }
>>>> 
>>>> 
>>>> 
>>>>      INPUT_KAFKA_TOPIC = args[0];
>>>> 
>>>>      TIME_WINDOW = Integer.parseInt(args[1]);
>>>> 
>>>> 
>>>> 
>>>>      Properties properties = null;
>>>> 
>>>> 
>>>> 
>>>>      properties = new Properties();
>>>> 
>>>>      properties.setProperty("bootstrap.servers", "localhost:9092");
>>>> 
>>>>      properties.setProperty("zookeeper.connect", "localhost:2181");
>>>> 
>>>>      properties.setProperty("group.id", "test");
>>>> 
>>>> 
>>>> 
>>>>      StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> 
>>>>      //env.setStateBackend(new
>>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>> 
>>>> 
>>>> 
>>>>      DataStreamSource<String> stream = env
>>>> 
>>>>              .addSource(new
>>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>>>> properties));
>>>> 
>>>> 
>>>> 
>>>>      // maps the data into Flink tuples
>>>> 
>>>>      DataStream<Tuple2<String,Double>> streamTuples = 
>>>> stream.flatMap(new Rec2Tuple2());
>>>> 
>>>> 
>>>> 
>>>>      // write the result to the console or in a Kafka topic
>>>> 
>>>>      streamTuples.print();
>>>> 
>>>> 
>>>> 
>>>>      env.execute("plus one");
>>>> 
>>>> 
>>>> 
>>>>  }
>>>> 
>>>> 
>>>> 
>>>>  public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>>>> Tuple2<String,Double> > {
>>>> 
>>>>      private transient ValueState<Tuple2<String, Double>> 
>>>> prev_tuple;
>>>> 
>>>> 
>>>> 
>>>>      @Override
>>>> 
>>>>      public void flatMap(String incString, Collector<Tuple2<String,
>>>> Double>> out) throws Exception {
>>>> 
>>>>          try {
>>>> 
>>>>              Double value = Double.parseDouble(incString);
>>>> 
>>>>              System.out.println("value = " + value);
>>>> 
>>>>              Tuple2<String, Double> prev_stored_tp = 
>>>> prev_tuple.value();
>>>> 
>>>>              System.out.println(prev_stored_tp);
>>>> 
>>>> 
>>>> 
>>>>              Double value2 = value - prev_stored_tp.f1;
>>>> 
>>>>              prev_stored_tp.f1 = value;
>>>> 
>>>>              prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>> 
>>>>              prev_tuple.update(prev_stored_tp);
>>>> 
>>>> 
>>>> 
>>>>              Tuple2<String, Double> tp = new Tuple2<String,
>>>> Double>();
>>>> 
>>>>              tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>> 
>>>>              tp.setField(value2, 1);
>>>> 
>>>>              out.collect(tp);
>>>> 
>>>> 
>>>> 
>>>>          } catch (NumberFormatException e) {
>>>> 
>>>>              System.out.println("Could not convert to Float" + 
>>>> incString);
>>>> 
>>>>              System.err.println("Could not convert to Float" + 
>>>> incString);
>>>> 
>>>>          }
>>>> 
>>>>      }
>>>> 
>>>> 
>>>> 
>>>>      @Override
>>>> 
>>>>      public void open(Configuration config) {
>>>> 
>>>>          ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>> 
>>>>                  new ValueStateDescriptor<>(
>>>> 
>>>>                          "previous input value", // the state name
>>>> 
>>>>                          TypeInformation.of(new 
>>>> TypeHint<Tuple2<String,
>>>> Double>>() {}), // type information
>>>> 
>>>>                          Tuple2.of("test topic", 0.0)); // default 
>>>> value of the state, if nothing was set
>>>> 
>>>>          prev_tuple = getRuntimeContext().getState(descriptor);
>>>> 
>>>>      }
>>>> 
>>>>  }
>>>> 
>>>> }
>>>> 
>>>> 
>>>> 
>>>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>>>> Sent: Thursday, August 11, 2016 5:45 AM
>>>> To: user@flink.apache.org
>>>> Subject: Re: flink - Working with State example
>>>> 
>>>> 
>>>> 
>>>> Hello Buvana,
>>>> 
>>>> 
>>>> 
>>>> Can you share a bit more details on your operator and how you are using it?
>>>> 
>>>> For example, are you using keyBy before using you custom operator?
>>>> 
>>>> 
>>>> 
>>>> Thanks a lot,
>>>> 
>>>> Kostas
>>>> 
>>>> 
>>>> 
>>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>>>> <bu...@nokia-bell-labs.com> wrote:
>>>> 
>>>> 
>>>> 
>>>> Hello,
>>>> 
>>>> 
>>>> 
>>>> I am utilizing the code snippet in:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>>> g /state.html and particularly ‘open’ function in my code:
>>>> 
>>>> @Override
>>>> 
>>>>  public void open(Configuration config) {
>>>> 
>>>>      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>> 
>>>>              new ValueStateDescriptor<>(
>>>> 
>>>>                      "average", // the state name
>>>> 
>>>>                      TypeInformation.of(new TypeHint<Tuple2<Long,
>>>> Long>>() {}), // type information
>>>> 
>>>>                      Tuple2.of(0L, 0L)); // default value of the 
>>>> state, if nothing was set
>>>> 
>>>>      sum = getRuntimeContext().getState(descriptor);
>>>> 
>>>>  }
>>>> 
>>>> 
>>>> 
>>>> When I run, I get the following error:
>>>> 
>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:120)
>>>> 
>>>>             at
>>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>> 
>>>>             at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>>> n
>>>> (FunctionUtils.java:38)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>>> p
>>>> en(AbstractUdfStreamOperator.java:91)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>>> a
>>>> tMap.java:41)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>>> (
>>>> StreamTask.java:314)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>>> k
>>>> .java:214)
>>>> 
>>>>             at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> 
>>>>             at java.lang.Thread.run(Thread.java:745)
>>>> 
>>>> Caused by: java.lang.Exception: State key serializer has not been 
>>>> configured in the config. This operation cannot use partitioned state.
>>>> 
>>>>             at
>>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>>> a
>>>> te(AbstractStateBackend.java:199)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>>> a
>>>> rtitionedState(AbstractStreamOperator.java:260)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:118)
>>>> 
>>>>             ... 8 more
>>>> 
>>>> 
>>>> 
>>>> Where do I define the key & value serializer for state?
>>>> 
>>>> 
>>>> 
>>>> Thanks,
>>>> 
>>>> Buvana
>>>> 
>>>> 
>> 
> 


RE: flink - Working with State example

Posted by "Ramanan, Buvana (Nokia - US)" <bu...@nokia-bell-labs.com>.
Kostas,
Good catch! That makes it working! Thank you so much for the help.
Regards,
Buvana

-----Original Message-----
From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com] 
Sent: Thursday, August 11, 2016 11:22 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Hi Buvana, 

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com> wrote:
> 
> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
> 
> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
> 
> Thanks again,
> Buvana
> 
> ======================================================================
> ==========================================
> Code:
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> 
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import 
> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
> 
> import java.util.Properties;
> 
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>    private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name 
> change -------------
> ---
> 	public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>            implements Checkpointed<Double> {
> 
>        private Double prev_tuple = null;
> 
>        @Override
>        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>            try {
>                Double value = Double.parseDouble(incString);
>                System.out.println("value = " + value);
>                System.out.println(prev_tuple);
> 
>                Double value2 = value - prev_tuple;
>                prev_tuple = value;
> 
>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>                tp.setField(value2, 1);
>                out.collect(tp);
>            } catch (NumberFormatException e) {
>                System.out.println("Could not convert to Float" + incString);
>                System.err.println("Could not convert to Float" + incString);
>            }
>        }
>        @Override
>        public void open(Configuration config) {
>            if (prev_tuple == null) {
>                // only recreate if null
>                // restoreState will be called before open()
>                // so this will already set the sum to the restored value
>                prev_tuple = new Double("0.0");
>            }
>        }
> 
>        @Override
>        public Serializable snapshotState(
>                long checkpointId,
>                long checkpointTimestamp) throws Exception {
>            return prev_tuple;
>        }
> 
> 
>        @Override
>        public void restoreState(Double state) {
>            prev_tuple = state;
>        }
>    }
> }
> ======================================================================
> =========================================
> ERROR message while building:
> 
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]                                                                         
> [INFO] 
> ----------------------------------------------------------------------
> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] 
> ----------------------------------------------------------------------
> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has 
> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- 
> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] 
> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO]
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO]
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
> wiki-edits --- [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to 
> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -------------------------------------------------------------
> [ERROR] COMPILATION ERROR : 
> [INFO] -------------------------------------------------------------
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
> does not override abstract method snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[151,29] snapshotState(long,long) in 
> wikiedits.stateful.MapStateful cannot implement 
> snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
>  return type java.io.Serializable is not compatible with 
> java.lang.Double [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[150,9] method does not override or implement a method from 
> a supertype [INFO] 3 errors [INFO] 
> -------------------------------------------------------------
> [INFO] 
> ----------------------------------------------------------------------
> --
> [INFO] BUILD FAILURE
> [INFO] 
> ----------------------------------------------------------------------
> --
> [INFO] Total time: 2.171s
> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: 
> 26M/660M [INFO] 
> ----------------------------------------------------------------------
> -- [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
> does not override abstract method snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[151,29] snapshotState(long,long) in 
> wikiedits.stateful.MapStateful cannot implement 
> snapshotState(long,long) in 
> org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] return type java.io.Serializable is not compatible with 
> java.lang.Double [ERROR] 
> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
> eful.java:[150,9] method does not override or implement a method from 
> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions, please read the following articles:
> [ERROR] [Help 1] 
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> ======================================================================
> ==========================================
> 
> -----Original Message-----
> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
> Sent: Thursday, August 11, 2016 10:34 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
> 
> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
> 
> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. 
> 
> Let us know if you need anything else.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <uc...@apache.org> wrote:
>> 
>> This only works for keyed streams, you have to use keyBy().
>> 
>> You can use the Checkpointed interface instead 
>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>> 
>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
>> <bu...@nokia-bell-labs.com> wrote:
>>> Hi Kostas,
>>> 
>>> 
>>> 
>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), 
>>> where x[t] is the current value of the incoming sample and x[t-1] is 
>>> the previous value of the incoming sample. I store the current value 
>>> in state store
>>> (‘prev_tuple’) so that I can use it for computation in next cycle. 
>>> As you may observe, I am not using keyBy. I am simply printing out 
>>> the resultant tuple.
>>> 
>>> 
>>> 
>>> It appears from the error message that I have to set the key 
>>> serializer (and possibly value serializer) for the state store. I am 
>>> not sure how to do that…
>>> 
>>> 
>>> 
>>> Thanks for your interest in helping,
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Regards,
>>> 
>>> Buvana
>>> 
>>> 
>>> 
>>> public class stateful {
>>> 
>>>   private static String INPUT_KAFKA_TOPIC = null;
>>> 
>>>   private static int TIME_WINDOW = 0;
>>> 
>>> 
>>> 
>>>   public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> 
>>>       if (args.length < 2) {
>>> 
>>>           throw new IllegalArgumentException("The application needs 
>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>> 
>>>                   + "fetch the data. The second argument is the size 
>>> of the window, in seconds, to which the aggregation function must be applied.
>>> \n");
>>> 
>>>       }
>>> 
>>> 
>>> 
>>>       INPUT_KAFKA_TOPIC = args[0];
>>> 
>>>       TIME_WINDOW = Integer.parseInt(args[1]);
>>> 
>>> 
>>> 
>>>       Properties properties = null;
>>> 
>>> 
>>> 
>>>       properties = new Properties();
>>> 
>>>       properties.setProperty("bootstrap.servers", "localhost:9092");
>>> 
>>>       properties.setProperty("zookeeper.connect", "localhost:2181");
>>> 
>>>       properties.setProperty("group.id", "test");
>>> 
>>> 
>>> 
>>>       StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>       //env.setStateBackend(new
>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>> 
>>> 
>>> 
>>>       DataStreamSource<String> stream = env
>>> 
>>>               .addSource(new
>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>>> properties));
>>> 
>>> 
>>> 
>>>       // maps the data into Flink tuples
>>> 
>>>       DataStream<Tuple2<String,Double>> streamTuples = 
>>> stream.flatMap(new Rec2Tuple2());
>>> 
>>> 
>>> 
>>>       // write the result to the console or in a Kafka topic
>>> 
>>>       streamTuples.print();
>>> 
>>> 
>>> 
>>>       env.execute("plus one");
>>> 
>>> 
>>> 
>>>   }
>>> 
>>> 
>>> 
>>>   public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>>> Tuple2<String,Double> > {
>>> 
>>>       private transient ValueState<Tuple2<String, Double>> 
>>> prev_tuple;
>>> 
>>> 
>>> 
>>>       @Override
>>> 
>>>       public void flatMap(String incString, Collector<Tuple2<String,
>>> Double>> out) throws Exception {
>>> 
>>>           try {
>>> 
>>>               Double value = Double.parseDouble(incString);
>>> 
>>>               System.out.println("value = " + value);
>>> 
>>>               Tuple2<String, Double> prev_stored_tp = 
>>> prev_tuple.value();
>>> 
>>>               System.out.println(prev_stored_tp);
>>> 
>>> 
>>> 
>>>               Double value2 = value - prev_stored_tp.f1;
>>> 
>>>               prev_stored_tp.f1 = value;
>>> 
>>>               prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>> 
>>>               prev_tuple.update(prev_stored_tp);
>>> 
>>> 
>>> 
>>>               Tuple2<String, Double> tp = new Tuple2<String,
>>> Double>();
>>> 
>>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>> 
>>>               tp.setField(value2, 1);
>>> 
>>>               out.collect(tp);
>>> 
>>> 
>>> 
>>>           } catch (NumberFormatException e) {
>>> 
>>>               System.out.println("Could not convert to Float" + 
>>> incString);
>>> 
>>>               System.err.println("Could not convert to Float" + 
>>> incString);
>>> 
>>>           }
>>> 
>>>       }
>>> 
>>> 
>>> 
>>>       @Override
>>> 
>>>       public void open(Configuration config) {
>>> 
>>>           ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>> 
>>>                   new ValueStateDescriptor<>(
>>> 
>>>                           "previous input value", // the state name
>>> 
>>>                           TypeInformation.of(new 
>>> TypeHint<Tuple2<String,
>>> Double>>() {}), // type information
>>> 
>>>                           Tuple2.of("test topic", 0.0)); // default 
>>> value of the state, if nothing was set
>>> 
>>>           prev_tuple = getRuntimeContext().getState(descriptor);
>>> 
>>>       }
>>> 
>>>   }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>>> Sent: Thursday, August 11, 2016 5:45 AM
>>> To: user@flink.apache.org
>>> Subject: Re: flink - Working with State example
>>> 
>>> 
>>> 
>>> Hello Buvana,
>>> 
>>> 
>>> 
>>> Can you share a bit more details on your operator and how you are using it?
>>> 
>>> For example, are you using keyBy before using you custom operator?
>>> 
>>> 
>>> 
>>> Thanks a lot,
>>> 
>>> Kostas
>>> 
>>> 
>>> 
>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>>> <bu...@nokia-bell-labs.com> wrote:
>>> 
>>> 
>>> 
>>> Hello,
>>> 
>>> 
>>> 
>>> I am utilizing the code snippet in:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>> g /state.html and particularly ‘open’ function in my code:
>>> 
>>> @Override
>>> 
>>>   public void open(Configuration config) {
>>> 
>>>       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>> 
>>>               new ValueStateDescriptor<>(
>>> 
>>>                       "average", // the state name
>>> 
>>>                       TypeInformation.of(new TypeHint<Tuple2<Long,
>>> Long>>() {}), // type information
>>> 
>>>                       Tuple2.of(0L, 0L)); // default value of the 
>>> state, if nothing was set
>>> 
>>>       sum = getRuntimeContext().getState(descriptor);
>>> 
>>>   }
>>> 
>>> 
>>> 
>>> When I run, I get the following error:
>>> 
>>> Caused by: java.lang.RuntimeException: Error while getting state
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>> S
>>> tate(StreamingRuntimeContext.java:120)
>>> 
>>>              at
>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>> 
>>>              at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>> n
>>> (FunctionUtils.java:38)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>> p
>>> en(AbstractUdfStreamOperator.java:91)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>> a
>>> tMap.java:41)
>>> 
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>> (
>>> StreamTask.java:314)
>>> 
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>> k
>>> .java:214)
>>> 
>>>              at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> 
>>>              at java.lang.Thread.run(Thread.java:745)
>>> 
>>> Caused by: java.lang.Exception: State key serializer has not been 
>>> configured in the config. This operation cannot use partitioned state.
>>> 
>>>              at
>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>> a
>>> te(AbstractStateBackend.java:199)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>> a
>>> rtitionedState(AbstractStreamOperator.java:260)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>> S
>>> tate(StreamingRuntimeContext.java:118)
>>> 
>>>              ... 8 more
>>> 
>>> 
>>> 
>>> Where do I define the key & value serializer for state?
>>> 
>>> 
>>> 
>>> Thanks,
>>> 
>>> Buvana
>>> 
>>> 
> 


Re: flink - Working with State example

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Buvana, 

At a first glance, your snapshotState() should return a Double.

Kostas

> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com> wrote:
> 
> Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
> 
> Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?
> 
> Thanks again,
> Buvana
> 
> ================================================================================================================
> Code:
> 
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> 
> import java.io.Serializable;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.util.Collector;
> 
> import java.util.Properties;
> 
> /**
> * Created by buvana on 8/9/16.
> */
> public class stateful {
>    private static String INPUT_KAFKA_TOPIC = null;
> ---
> --- skipping the main as it’s the same as before except for class name change -------------
> ---
> 	public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
>            implements Checkpointed<Double> {
> 
>        private Double prev_tuple = null;
> 
>        @Override
>        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
>            try {
>                Double value = Double.parseDouble(incString);
>                System.out.println("value = " + value);
>                System.out.println(prev_tuple);
> 
>                Double value2 = value - prev_tuple;
>                prev_tuple = value;
> 
>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>                tp.setField(value2, 1);
>                out.collect(tp);
>            } catch (NumberFormatException e) {
>                System.out.println("Could not convert to Float" + incString);
>                System.err.println("Could not convert to Float" + incString);
>            }
>        }
>        @Override
>        public void open(Configuration config) {
>            if (prev_tuple == null) {
>                // only recreate if null
>                // restoreState will be called before open()
>                // so this will already set the sum to the restored value
>                prev_tuple = new Double("0.0");
>            }
>        }
> 
>        @Override
>        public Serializable snapshotState(
>                long checkpointId,
>                long checkpointTimestamp) throws Exception {
>            return prev_tuple;
>        }
> 
> 
>        @Override
>        public void restoreState(Double state) {
>            prev_tuple = state;
>        }
>    }
> }
> ===============================================================================================================
> ERROR message while building:
> 
> $ mvn clean package
> [INFO] Scanning for projects...
> [INFO]                                                                         
> [INFO] ------------------------------------------------------------------------
> [INFO] Building Flink Quickstart Job 0.1
> [INFO] ------------------------------------------------------------------------
> [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been relocated to commons-io:commons-io:jar:1.3.2
> [INFO] 
> [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
> [INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
> [INFO] 
> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ wiki-edits ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 7 source files to /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
> [INFO] -------------------------------------------------------------
> [ERROR] COMPILATION ERROR : 
> [INFO] -------------------------------------------------------------
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
>  return type java.io.Serializable is not compatible with java.lang.Double
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
> [INFO] 3 errors 
> [INFO] -------------------------------------------------------------
> [INFO] ------------------------------------------------------------------------
> [INFO] BUILD FAILURE
> [INFO] ------------------------------------------------------------------------
> [INFO] Total time: 2.171s
> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
> [INFO] Final Memory: 26M/660M
> [INFO] ------------------------------------------------------------------------
> [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
> [ERROR] return type java.io.Serializable is not compatible with java.lang.Double
> [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
> [ERROR] -> [Help 1]
> [ERROR] 
> [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR] 
> [ERROR] For more information about the errors and possible solutions, please read the following articles:
> [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
> ================================================================================================================
> 
> -----Original Message-----
> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com] 
> Sent: Thursday, August 11, 2016 10:34 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.
> 
> The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.
> 
> If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. 
> 
> Let us know if you need anything else.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <uc...@apache.org> wrote:
>> 
>> This only works for keyed streams, you have to use keyBy().
>> 
>> You can use the Checkpointed interface instead 
>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>> 
>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
>> <bu...@nokia-bell-labs.com> wrote:
>>> Hi Kostas,
>>> 
>>> 
>>> 
>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where 
>>> x[t] is the current value of the incoming sample and x[t-1] is the 
>>> previous value of the incoming sample. I store the current value in 
>>> state store
>>> (‘prev_tuple’) so that I can use it for computation in next cycle. As 
>>> you may observe, I am not using keyBy. I am simply printing out the 
>>> resultant tuple.
>>> 
>>> 
>>> 
>>> It appears from the error message that I have to set the key 
>>> serializer (and possibly value serializer) for the state store. I am 
>>> not sure how to do that…
>>> 
>>> 
>>> 
>>> Thanks for your interest in helping,
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Regards,
>>> 
>>> Buvana
>>> 
>>> 
>>> 
>>> public class stateful {
>>> 
>>>   private static String INPUT_KAFKA_TOPIC = null;
>>> 
>>>   private static int TIME_WINDOW = 0;
>>> 
>>> 
>>> 
>>>   public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> 
>>>       if (args.length < 2) {
>>> 
>>>           throw new IllegalArgumentException("The application needs 
>>> two arguments. The first is the name of the kafka topic from which it has to \n"
>>> 
>>>                   + "fetch the data. The second argument is the size 
>>> of the window, in seconds, to which the aggregation function must be applied.
>>> \n");
>>> 
>>>       }
>>> 
>>> 
>>> 
>>>       INPUT_KAFKA_TOPIC = args[0];
>>> 
>>>       TIME_WINDOW = Integer.parseInt(args[1]);
>>> 
>>> 
>>> 
>>>       Properties properties = null;
>>> 
>>> 
>>> 
>>>       properties = new Properties();
>>> 
>>>       properties.setProperty("bootstrap.servers", "localhost:9092");
>>> 
>>>       properties.setProperty("zookeeper.connect", "localhost:2181");
>>> 
>>>       properties.setProperty("group.id", "test");
>>> 
>>> 
>>> 
>>>       StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>       //env.setStateBackend(new
>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>> 
>>> 
>>> 
>>>       DataStreamSource<String> stream = env
>>> 
>>>               .addSource(new 
>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>>> properties));
>>> 
>>> 
>>> 
>>>       // maps the data into Flink tuples
>>> 
>>>       DataStream<Tuple2<String,Double>> streamTuples = 
>>> stream.flatMap(new Rec2Tuple2());
>>> 
>>> 
>>> 
>>>       // write the result to the console or in a Kafka topic
>>> 
>>>       streamTuples.print();
>>> 
>>> 
>>> 
>>>       env.execute("plus one");
>>> 
>>> 
>>> 
>>>   }
>>> 
>>> 
>>> 
>>>   public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>>> Tuple2<String,Double> > {
>>> 
>>>       private transient ValueState<Tuple2<String, Double>> 
>>> prev_tuple;
>>> 
>>> 
>>> 
>>>       @Override
>>> 
>>>       public void flatMap(String incString, Collector<Tuple2<String,
>>> Double>> out) throws Exception {
>>> 
>>>           try {
>>> 
>>>               Double value = Double.parseDouble(incString);
>>> 
>>>               System.out.println("value = " + value);
>>> 
>>>               Tuple2<String, Double> prev_stored_tp = 
>>> prev_tuple.value();
>>> 
>>>               System.out.println(prev_stored_tp);
>>> 
>>> 
>>> 
>>>               Double value2 = value - prev_stored_tp.f1;
>>> 
>>>               prev_stored_tp.f1 = value;
>>> 
>>>               prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>> 
>>>               prev_tuple.update(prev_stored_tp);
>>> 
>>> 
>>> 
>>>               Tuple2<String, Double> tp = new Tuple2<String, 
>>> Double>();
>>> 
>>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>> 
>>>               tp.setField(value2, 1);
>>> 
>>>               out.collect(tp);
>>> 
>>> 
>>> 
>>>           } catch (NumberFormatException e) {
>>> 
>>>               System.out.println("Could not convert to Float" + 
>>> incString);
>>> 
>>>               System.err.println("Could not convert to Float" + 
>>> incString);
>>> 
>>>           }
>>> 
>>>       }
>>> 
>>> 
>>> 
>>>       @Override
>>> 
>>>       public void open(Configuration config) {
>>> 
>>>           ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>> 
>>>                   new ValueStateDescriptor<>(
>>> 
>>>                           "previous input value", // the state name
>>> 
>>>                           TypeInformation.of(new 
>>> TypeHint<Tuple2<String,
>>> Double>>() {}), // type information
>>> 
>>>                           Tuple2.of("test topic", 0.0)); // default 
>>> value of the state, if nothing was set
>>> 
>>>           prev_tuple = getRuntimeContext().getState(descriptor);
>>> 
>>>       }
>>> 
>>>   }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>>> Sent: Thursday, August 11, 2016 5:45 AM
>>> To: user@flink.apache.org
>>> Subject: Re: flink - Working with State example
>>> 
>>> 
>>> 
>>> Hello Buvana,
>>> 
>>> 
>>> 
>>> Can you share a bit more details on your operator and how you are using it?
>>> 
>>> For example, are you using keyBy before using you custom operator?
>>> 
>>> 
>>> 
>>> Thanks a lot,
>>> 
>>> Kostas
>>> 
>>> 
>>> 
>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>>> <bu...@nokia-bell-labs.com> wrote:
>>> 
>>> 
>>> 
>>> Hello,
>>> 
>>> 
>>> 
>>> I am utilizing the code snippet in:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming
>>> /state.html and particularly ‘open’ function in my code:
>>> 
>>> @Override
>>> 
>>>   public void open(Configuration config) {
>>> 
>>>       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>> 
>>>               new ValueStateDescriptor<>(
>>> 
>>>                       "average", // the state name
>>> 
>>>                       TypeInformation.of(new TypeHint<Tuple2<Long,
>>> Long>>() {}), // type information
>>> 
>>>                       Tuple2.of(0L, 0L)); // default value of the 
>>> state, if nothing was set
>>> 
>>>       sum = getRuntimeContext().getState(descriptor);
>>> 
>>>   }
>>> 
>>> 
>>> 
>>> When I run, I get the following error:
>>> 
>>> Caused by: java.lang.RuntimeException: Error while getting state
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>>> tate(StreamingRuntimeContext.java:120)
>>> 
>>>              at 
>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>> 
>>>              at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction
>>> (FunctionUtils.java:38)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op
>>> en(AbstractUdfStreamOperator.java:91)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla
>>> tMap.java:41)
>>> 
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
>>> StreamTask.java:314)
>>> 
>>>              at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>>> .java:214)
>>> 
>>>              at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> 
>>>              at java.lang.Thread.run(Thread.java:745)
>>> 
>>> Caused by: java.lang.Exception: State key serializer has not been 
>>> configured in the config. This operation cannot use partitioned state.
>>> 
>>>              at
>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta
>>> te(AbstractStateBackend.java:199)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa
>>> rtitionedState(AbstractStreamOperator.java:260)
>>> 
>>>              at
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>>> tate(StreamingRuntimeContext.java:118)
>>> 
>>>              ... 8 more
>>> 
>>> 
>>> 
>>> Where do I define the key & value serializer for state?
>>> 
>>> 
>>> 
>>> Thanks,
>>> 
>>> Buvana
>>> 
>>> 
> 


RE: flink - Working with State example

Posted by "Ramanan, Buvana (Nokia - US)" <bu...@nokia-bell-labs.com>.
Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:

Is the Serializable definition supposed to be from java.io.Serializable or somewhere else?

Thanks again,
Buvana

================================================================================================================
Code:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.functions.RichFlatMapFunction;

import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * Created by buvana on 8/9/16.
 */
public class stateful {
    private static String INPUT_KAFKA_TOPIC = null;
---
--- skipping the main as it’s the same as before except for class name change -------------
---
	public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>>
            implements Checkpointed<Double> {

        private Double prev_tuple = null;

        @Override
        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) {
            try {
                Double value = Double.parseDouble(incString);
                System.out.println("value = " + value);
                System.out.println(prev_tuple);

                Double value2 = value - prev_tuple;
                prev_tuple = value;

                Tuple2<String, Double> tp = new Tuple2<String, Double>();
                tp.setField(INPUT_KAFKA_TOPIC, 0);
                tp.setField(value2, 1);
                out.collect(tp);
            } catch (NumberFormatException e) {
                System.out.println("Could not convert to Float" + incString);
                System.err.println("Could not convert to Float" + incString);
            }
        }
        @Override
        public void open(Configuration config) {
            if (prev_tuple == null) {
                // only recreate if null
                // restoreState will be called before open()
                // so this will already set the sum to the restored value
                prev_tuple = new Double("0.0");
            }
        }

        @Override
        public Serializable snapshotState(
                long checkpointId,
                long checkpointTimestamp) throws Exception {
            return prev_tuple;
        }


        @Override
        public void restoreState(Double state) {
            prev_tuple = state;
        }
    }
}
===============================================================================================================
ERROR message while building:

$ mvn clean package
[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building Flink Quickstart Job 0.1
[INFO] ------------------------------------------------------------------------
[WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been relocated to commons-io:commons-io:jar:1.3.2
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
[INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
[INFO] 
[INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ wiki-edits ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 7 source files to /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR : 
[INFO] -------------------------------------------------------------
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
  return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
[INFO] 3 errors 
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.171s
[INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
[INFO] Final Memory: 26M/660M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure:
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
================================================================================================================

-----Original Message-----
From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com] 
Sent: Thursday, August 11, 2016 10:34 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key.

If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. 

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <uc...@apache.org> wrote:
> 
> This only works for keyed streams, you have to use keyBy().
> 
> You can use the Checkpointed interface instead 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
> 
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
> <bu...@nokia-bell-labs.com> wrote:
>> Hi Kostas,
>> 
>> 
>> 
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where 
>> x[t] is the current value of the incoming sample and x[t-1] is the 
>> previous value of the incoming sample. I store the current value in 
>> state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As 
>> you may observe, I am not using keyBy. I am simply printing out the 
>> resultant tuple.
>> 
>> 
>> 
>> It appears from the error message that I have to set the key 
>> serializer (and possibly value serializer) for the state store. I am 
>> not sure how to do that…
>> 
>> 
>> 
>> Thanks for your interest in helping,
>> 
>> 
>> 
>> 
>> 
>> Regards,
>> 
>> Buvana
>> 
>> 
>> 
>> public class stateful {
>> 
>>    private static String INPUT_KAFKA_TOPIC = null;
>> 
>>    private static int TIME_WINDOW = 0;
>> 
>> 
>> 
>>    public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>>        if (args.length < 2) {
>> 
>>            throw new IllegalArgumentException("The application needs 
>> two arguments. The first is the name of the kafka topic from which it has to \n"
>> 
>>                    + "fetch the data. The second argument is the size 
>> of the window, in seconds, to which the aggregation function must be applied.
>> \n");
>> 
>>        }
>> 
>> 
>> 
>>        INPUT_KAFKA_TOPIC = args[0];
>> 
>>        TIME_WINDOW = Integer.parseInt(args[1]);
>> 
>> 
>> 
>>        Properties properties = null;
>> 
>> 
>> 
>>        properties = new Properties();
>> 
>>        properties.setProperty("bootstrap.servers", "localhost:9092");
>> 
>>        properties.setProperty("zookeeper.connect", "localhost:2181");
>> 
>>        properties.setProperty("group.id", "test");
>> 
>> 
>> 
>>        StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>        //env.setStateBackend(new
>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>> 
>> 
>> 
>>        DataStreamSource<String> stream = env
>> 
>>                .addSource(new 
>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>> properties));
>> 
>> 
>> 
>>        // maps the data into Flink tuples
>> 
>>        DataStream<Tuple2<String,Double>> streamTuples = 
>> stream.flatMap(new Rec2Tuple2());
>> 
>> 
>> 
>>        // write the result to the console or in a Kafka topic
>> 
>>        streamTuples.print();
>> 
>> 
>> 
>>        env.execute("plus one");
>> 
>> 
>> 
>>    }
>> 
>> 
>> 
>>    public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>> Tuple2<String,Double> > {
>> 
>>        private transient ValueState<Tuple2<String, Double>> 
>> prev_tuple;
>> 
>> 
>> 
>>        @Override
>> 
>>        public void flatMap(String incString, Collector<Tuple2<String,
>> Double>> out) throws Exception {
>> 
>>            try {
>> 
>>                Double value = Double.parseDouble(incString);
>> 
>>                System.out.println("value = " + value);
>> 
>>                Tuple2<String, Double> prev_stored_tp = 
>> prev_tuple.value();
>> 
>>                System.out.println(prev_stored_tp);
>> 
>> 
>> 
>>                Double value2 = value - prev_stored_tp.f1;
>> 
>>                prev_stored_tp.f1 = value;
>> 
>>                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>> 
>>                prev_tuple.update(prev_stored_tp);
>> 
>> 
>> 
>>                Tuple2<String, Double> tp = new Tuple2<String, 
>> Double>();
>> 
>>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>> 
>>                tp.setField(value2, 1);
>> 
>>                out.collect(tp);
>> 
>> 
>> 
>>            } catch (NumberFormatException e) {
>> 
>>                System.out.println("Could not convert to Float" + 
>> incString);
>> 
>>                System.err.println("Could not convert to Float" + 
>> incString);
>> 
>>            }
>> 
>>        }
>> 
>> 
>> 
>>        @Override
>> 
>>        public void open(Configuration config) {
>> 
>>            ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>> 
>>                    new ValueStateDescriptor<>(
>> 
>>                            "previous input value", // the state name
>> 
>>                            TypeInformation.of(new 
>> TypeHint<Tuple2<String,
>> Double>>() {}), // type information
>> 
>>                            Tuple2.of("test topic", 0.0)); // default 
>> value of the state, if nothing was set
>> 
>>            prev_tuple = getRuntimeContext().getState(descriptor);
>> 
>>        }
>> 
>>    }
>> 
>> }
>> 
>> 
>> 
>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>> Sent: Thursday, August 11, 2016 5:45 AM
>> To: user@flink.apache.org
>> Subject: Re: flink - Working with State example
>> 
>> 
>> 
>> Hello Buvana,
>> 
>> 
>> 
>> Can you share a bit more details on your operator and how you are using it?
>> 
>> For example, are you using keyBy before using you custom operator?
>> 
>> 
>> 
>> Thanks a lot,
>> 
>> Kostas
>> 
>> 
>> 
>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>> <bu...@nokia-bell-labs.com> wrote:
>> 
>> 
>> 
>> Hello,
>> 
>> 
>> 
>> I am utilizing the code snippet in:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming
>> /state.html and particularly ‘open’ function in my code:
>> 
>> @Override
>> 
>>    public void open(Configuration config) {
>> 
>>        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>> 
>>                new ValueStateDescriptor<>(
>> 
>>                        "average", // the state name
>> 
>>                        TypeInformation.of(new TypeHint<Tuple2<Long,
>> Long>>() {}), // type information
>> 
>>                        Tuple2.of(0L, 0L)); // default value of the 
>> state, if nothing was set
>> 
>>        sum = getRuntimeContext().getState(descriptor);
>> 
>>    }
>> 
>> 
>> 
>> When I run, I get the following error:
>> 
>> Caused by: java.lang.RuntimeException: Error while getting state
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>> tate(StreamingRuntimeContext.java:120)
>> 
>>               at 
>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>> 
>>               at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction
>> (FunctionUtils.java:38)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op
>> en(AbstractUdfStreamOperator.java:91)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla
>> tMap.java:41)
>> 
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
>> StreamTask.java:314)
>> 
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> .java:214)
>> 
>>               at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> 
>>               at java.lang.Thread.run(Thread.java:745)
>> 
>> Caused by: java.lang.Exception: State key serializer has not been 
>> configured in the config. This operation cannot use partitioned state.
>> 
>>               at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta
>> te(AbstractStateBackend.java:199)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa
>> rtitionedState(AbstractStreamOperator.java:260)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>> tate(StreamingRuntimeContext.java:118)
>> 
>>               ... 8 more
>> 
>> 
>> 
>> Where do I define the key & value serializer for state?
>> 
>> 
>> 
>> Thanks,
>> 
>> Buvana
>> 
>> 


Re: flink - Working with State example

Posted by Kostas Kloudas <k....@data-artisans.com>.
Exactly as Ufuk suggested, if you are not grouping your stream by key, 
you should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the one that
implicitly sets the keySerializer and scopes your (keyed) state to a specific key.

If there is no keying, then keyed state cannot be used and the Checkpointed interface 
should be used instead. 

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <uc...@apache.org> wrote:
> 
> This only works for keyed streams, you have to use keyBy().
> 
> You can use the Checkpointed interface instead
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
> 
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
> <bu...@nokia-bell-labs.com> wrote:
>> Hi Kostas,
>> 
>> 
>> 
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
>> is the current value of the incoming sample and x[t-1] is the previous value
>> of the incoming sample. I store the current value in state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
>> may observe, I am not using keyBy. I am simply printing out the resultant
>> tuple.
>> 
>> 
>> 
>> It appears from the error message that I have to set the key serializer (and
>> possibly value serializer) for the state store. I am not sure how to do
>> that…
>> 
>> 
>> 
>> Thanks for your interest in helping,
>> 
>> 
>> 
>> 
>> 
>> Regards,
>> 
>> Buvana
>> 
>> 
>> 
>> public class stateful {
>> 
>>    private static String INPUT_KAFKA_TOPIC = null;
>> 
>>    private static int TIME_WINDOW = 0;
>> 
>> 
>> 
>>    public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>>        if (args.length < 2) {
>> 
>>            throw new IllegalArgumentException("The application needs two
>> arguments. The first is the name of the kafka topic from which it has to \n"
>> 
>>                    + "fetch the data. The second argument is the size of
>> the window, in seconds, to which the aggregation function must be applied.
>> \n");
>> 
>>        }
>> 
>> 
>> 
>>        INPUT_KAFKA_TOPIC = args[0];
>> 
>>        TIME_WINDOW = Integer.parseInt(args[1]);
>> 
>> 
>> 
>>        Properties properties = null;
>> 
>> 
>> 
>>        properties = new Properties();
>> 
>>        properties.setProperty("bootstrap.servers", "localhost:9092");
>> 
>>        properties.setProperty("zookeeper.connect", "localhost:2181");
>> 
>>        properties.setProperty("group.id", "test");
>> 
>> 
>> 
>>        StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>        //env.setStateBackend(new
>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>> 
>> 
>> 
>>        DataStreamSource<String> stream = env
>> 
>>                .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
>> SimpleStringSchema(), properties));
>> 
>> 
>> 
>>        // maps the data into Flink tuples
>> 
>>        DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
>> Rec2Tuple2());
>> 
>> 
>> 
>>        // write the result to the console or in a Kafka topic
>> 
>>        streamTuples.print();
>> 
>> 
>> 
>>        env.execute("plus one");
>> 
>> 
>> 
>>    }
>> 
>> 
>> 
>>    public static class Rec2Tuple2 extends RichFlatMapFunction<String,
>> Tuple2<String,Double> > {
>> 
>>        private transient ValueState<Tuple2<String, Double>> prev_tuple;
>> 
>> 
>> 
>>        @Override
>> 
>>        public void flatMap(String incString, Collector<Tuple2<String,
>> Double>> out) throws Exception {
>> 
>>            try {
>> 
>>                Double value = Double.parseDouble(incString);
>> 
>>                System.out.println("value = " + value);
>> 
>>                Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
>> 
>>                System.out.println(prev_stored_tp);
>> 
>> 
>> 
>>                Double value2 = value - prev_stored_tp.f1;
>> 
>>                prev_stored_tp.f1 = value;
>> 
>>                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>> 
>>                prev_tuple.update(prev_stored_tp);
>> 
>> 
>> 
>>                Tuple2<String, Double> tp = new Tuple2<String, Double>();
>> 
>>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>> 
>>                tp.setField(value2, 1);
>> 
>>                out.collect(tp);
>> 
>> 
>> 
>>            } catch (NumberFormatException e) {
>> 
>>                System.out.println("Could not convert to Float" +
>> incString);
>> 
>>                System.err.println("Could not convert to Float" +
>> incString);
>> 
>>            }
>> 
>>        }
>> 
>> 
>> 
>>        @Override
>> 
>>        public void open(Configuration config) {
>> 
>>            ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>> 
>>                    new ValueStateDescriptor<>(
>> 
>>                            "previous input value", // the state name
>> 
>>                            TypeInformation.of(new TypeHint<Tuple2<String,
>> Double>>() {}), // type information
>> 
>>                            Tuple2.of("test topic", 0.0)); // default value
>> of the state, if nothing was set
>> 
>>            prev_tuple = getRuntimeContext().getState(descriptor);
>> 
>>        }
>> 
>>    }
>> 
>> }
>> 
>> 
>> 
>> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
>> Sent: Thursday, August 11, 2016 5:45 AM
>> To: user@flink.apache.org
>> Subject: Re: flink - Working with State example
>> 
>> 
>> 
>> Hello Buvana,
>> 
>> 
>> 
>> Can you share a bit more details on your operator and how you are using it?
>> 
>> For example, are you using keyBy before using you custom operator?
>> 
>> 
>> 
>> Thanks a lot,
>> 
>> Kostas
>> 
>> 
>> 
>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
>> <bu...@nokia-bell-labs.com> wrote:
>> 
>> 
>> 
>> Hello,
>> 
>> 
>> 
>> I am utilizing the code snippet in:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
>> and particularly ‘open’ function in my code:
>> 
>> @Override
>> 
>>    public void open(Configuration config) {
>> 
>>        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>> 
>>                new ValueStateDescriptor<>(
>> 
>>                        "average", // the state name
>> 
>>                        TypeInformation.of(new TypeHint<Tuple2<Long,
>> Long>>() {}), // type information
>> 
>>                        Tuple2.of(0L, 0L)); // default value of the state,
>> if nothing was set
>> 
>>        sum = getRuntimeContext().getState(descriptor);
>> 
>>    }
>> 
>> 
>> 
>> When I run, I get the following error:
>> 
>> Caused by: java.lang.RuntimeException: Error while getting state
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>> 
>>               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>> 
>>               at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>> 
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>> 
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>> 
>>               at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> 
>>               at java.lang.Thread.run(Thread.java:745)
>> 
>> Caused by: java.lang.Exception: State key serializer has not been configured
>> in the config. This operation cannot use partitioned state.
>> 
>>               at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>> 
>>               ... 8 more
>> 
>> 
>> 
>> Where do I define the key & value serializer for state?
>> 
>> 
>> 
>> Thanks,
>> 
>> Buvana
>> 
>> 


Re: flink - Working with State example

Posted by Ufuk Celebi <uc...@apache.org>.
This only works for keyed streams, you have to use keyBy().

You can use the Checkpointed interface instead
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).

On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US)
<bu...@nokia-bell-labs.com> wrote:
> Hi Kostas,
>
>
>
> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t]
> is the current value of the incoming sample and x[t-1] is the previous value
> of the incoming sample. I store the current value in state store
> (‘prev_tuple’) so that I can use it for computation in next cycle. As you
> may observe, I am not using keyBy. I am simply printing out the resultant
> tuple.
>
>
>
> It appears from the error message that I have to set the key serializer (and
> possibly value serializer) for the state store. I am not sure how to do
> that…
>
>
>
> Thanks for your interest in helping,
>
>
>
>
>
> Regards,
>
> Buvana
>
>
>
> public class stateful {
>
>     private static String INPUT_KAFKA_TOPIC = null;
>
>     private static int TIME_WINDOW = 0;
>
>
>
>     public static void main(String[] args) throws Exception {
>
>
>
>         if (args.length < 2) {
>
>             throw new IllegalArgumentException("The application needs two
> arguments. The first is the name of the kafka topic from which it has to \n"
>
>                     + "fetch the data. The second argument is the size of
> the window, in seconds, to which the aggregation function must be applied.
> \n");
>
>         }
>
>
>
>         INPUT_KAFKA_TOPIC = args[0];
>
>         TIME_WINDOW = Integer.parseInt(args[1]);
>
>
>
>         Properties properties = null;
>
>
>
>         properties = new Properties();
>
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>
>         properties.setProperty("group.id", "test");
>
>
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         //env.setStateBackend(new
> FsStateBackend("file://home/buvana/flink/checkpoints"));
>
>
>
>         DataStreamSource<String> stream = env
>
>                 .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
> SimpleStringSchema(), properties));
>
>
>
>         // maps the data into Flink tuples
>
>         DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
> Rec2Tuple2());
>
>
>
>         // write the result to the console or in a Kafka topic
>
>         streamTuples.print();
>
>
>
>         env.execute("plus one");
>
>
>
>     }
>
>
>
>     public static class Rec2Tuple2 extends RichFlatMapFunction<String,
> Tuple2<String,Double> > {
>
>         private transient ValueState<Tuple2<String, Double>> prev_tuple;
>
>
>
>         @Override
>
>         public void flatMap(String incString, Collector<Tuple2<String,
> Double>> out) throws Exception {
>
>             try {
>
>                 Double value = Double.parseDouble(incString);
>
>                 System.out.println("value = " + value);
>
>                 Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
>
>                 System.out.println(prev_stored_tp);
>
>
>
>                 Double value2 = value - prev_stored_tp.f1;
>
>                 prev_stored_tp.f1 = value;
>
>                 prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>
>                 prev_tuple.update(prev_stored_tp);
>
>
>
>                 Tuple2<String, Double> tp = new Tuple2<String, Double>();
>
>                 tp.setField(INPUT_KAFKA_TOPIC, 0);
>
>                 tp.setField(value2, 1);
>
>                 out.collect(tp);
>
>
>
>             } catch (NumberFormatException e) {
>
>                 System.out.println("Could not convert to Float" +
> incString);
>
>                 System.err.println("Could not convert to Float" +
> incString);
>
>             }
>
>         }
>
>
>
>         @Override
>
>         public void open(Configuration config) {
>
>             ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>
>                     new ValueStateDescriptor<>(
>
>                             "previous input value", // the state name
>
>                             TypeInformation.of(new TypeHint<Tuple2<String,
> Double>>() {}), // type information
>
>                             Tuple2.of("test topic", 0.0)); // default value
> of the state, if nothing was set
>
>             prev_tuple = getRuntimeContext().getState(descriptor);
>
>         }
>
>     }
>
> }
>
>
>
> From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
> Sent: Thursday, August 11, 2016 5:45 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
>
>
>
> Hello Buvana,
>
>
>
> Can you share a bit more details on your operator and how you are using it?
>
> For example, are you using keyBy before using you custom operator?
>
>
>
> Thanks a lot,
>
> Kostas
>
>
>
> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
> <bu...@nokia-bell-labs.com> wrote:
>
>
>
> Hello,
>
>
>
> I am utilizing the code snippet in:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
> and particularly ‘open’ function in my code:
>
> @Override
>
>     public void open(Configuration config) {
>
>         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>
>                 new ValueStateDescriptor<>(
>
>                         "average", // the state name
>
>                         TypeInformation.of(new TypeHint<Tuple2<Long,
> Long>>() {}), // type information
>
>                         Tuple2.of(0L, 0L)); // default value of the state,
> if nothing was set
>
>         sum = getRuntimeContext().getState(descriptor);
>
>     }
>
>
>
> When I run, I get the following error:
>
> Caused by: java.lang.RuntimeException: Error while getting state
>
>                at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>
>                at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>
>                at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>
>                at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>
>                at
> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>
>                at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>
>                at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>
>                at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>
>                at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.Exception: State key serializer has not been configured
> in the config. This operation cannot use partitioned state.
>
>                at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>
>                at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>
>                at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>
>                ... 8 more
>
>
>
> Where do I define the key & value serializer for state?
>
>
>
> Thanks,
>
> Buvana
>
>

RE: flink - Working with State example

Posted by "Ramanan, Buvana (Nokia - US)" <bu...@nokia-bell-labs.com>.
Hi Kostas,

Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] is the current value of the incoming sample and x[t-1] is the previous value of the incoming sample. I store the current value in state store (‘prev_tuple’) so that I can use it for computation in next cycle. As you may observe, I am not using keyBy. I am simply printing out the resultant tuple.

It appears from the error message that I have to set the key serializer (and possibly value serializer) for the state store. I am not sure how to do that…

Thanks for your interest in helping,


Regards,
Buvana

public class stateful {
    private static String INPUT_KAFKA_TOPIC = null;
    private static int TIME_WINDOW = 0;

    public static void main(String[] args) throws Exception {

        if (args.length < 2) {
            throw new IllegalArgumentException("The application needs two arguments. The first is the name of the kafka topic from which it has to \n"
                    + "fetch the data. The second argument is the size of the window, in seconds, to which the aggregation function must be applied. \n");
        }

        INPUT_KAFKA_TOPIC = args[0];
        TIME_WINDOW = Integer.parseInt(args[1]);

        Properties properties = null;

        properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setStateBackend(new FsStateBackend("file://home/buvana/flink/checkpoints"));

        DataStreamSource<String> stream = env
                .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), properties));

        // maps the data into Flink tuples
        DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new Rec2Tuple2());

        // write the result to the console or in a Kafka topic
        streamTuples.print();

        env.execute("plus one");

    }

    public static class Rec2Tuple2 extends RichFlatMapFunction<String, Tuple2<String,Double> > {
        private transient ValueState<Tuple2<String, Double>> prev_tuple;

        @Override
        public void flatMap(String incString, Collector<Tuple2<String, Double>> out) throws Exception {
            try {
                Double value = Double.parseDouble(incString);
                System.out.println("value = " + value);
                Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
                System.out.println(prev_stored_tp);

                Double value2 = value - prev_stored_tp.f1;
                prev_stored_tp.f1 = value;
                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
                prev_tuple.update(prev_stored_tp);

                Tuple2<String, Double> tp = new Tuple2<String, Double>();
                tp.setField(INPUT_KAFKA_TOPIC, 0);
                tp.setField(value2, 1);
                out.collect(tp);

            } catch (NumberFormatException e) {
                System.out.println("Could not convert to Float" + incString);
                System.err.println("Could not convert to Float" + incString);
            }
        }

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Tuple2<String, Double>> descriptor =
                    new ValueStateDescriptor<>(
                            "previous input value", // the state name
                            TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}), // type information
                            Tuple2.of("test topic", 0.0)); // default value of the state, if nothing was set
            prev_tuple = getRuntimeContext().getState(descriptor);
        }
    }
}

From: Kostas Kloudas [mailto:k.kloudas@data-artisans.com]
Sent: Thursday, August 11, 2016 5:45 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Hello Buvana,

Can you share a bit more details on your operator and how you are using it?
For example, are you using keyBy before using you custom operator?

Thanks a lot,
Kostas

On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com>> wrote:

Hello,

I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code:
@Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }

When I run, I get the following error:
Caused by: java.lang.RuntimeException: Error while getting state
               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
               at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
               at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
               at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.
               at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
               at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
               ... 8 more

Where do I define the key & value serializer for state?

Thanks,
Buvana


Re: flink - Working with State example

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hello Buvana,

Can you share a bit more details on your operator and how you are using it?
For example, are you using keyBy before using you custom operator?

Thanks a lot,
Kostas

> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) <bu...@nokia-bell-labs.com> wrote:
> 
> Hello,
>  
> I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html> and particularly ‘open’ function in my code:
> @Override
> 
>     public void open(Configuration config) {
> 
>         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
> 
>                 new ValueStateDescriptor<>(
> 
>                         "average", // the state name
> 
>                         TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
> 
>                         Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
> 
>         sum = getRuntimeContext().getState(descriptor);
> 
>     }
> 
>  
> When I run, I get the following error:
> Caused by: java.lang.RuntimeException: Error while getting state
>                at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>                at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>                at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>                at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>                at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>                at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.
>                at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>                at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>                ... 8 more
>  
> Where do I define the key & value serializer for state?
>  
> Thanks,
> Buvana


flink - Working with State example

Posted by "Ramanan, Buvana (Nokia - US)" <bu...@nokia-bell-labs.com>.
Hello,

I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code:
@Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }

When I run, I get the following error:
Caused by: java.lang.RuntimeException: Error while getting state
               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
               at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
               at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
               at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
               at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
               at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
               at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state.
               at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
               at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
               at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
               ... 8 more

Where do I define the key & value serializer for state?

Thanks,
Buvana

Re: flink no class found error

Posted by Janardhan Reddy <ja...@olacabs.com>.
can you please explain a bit more about last option. We are using yarn so
guava might be in some classpath.

On Thu, Aug 11, 2016 at 1:29 AM, Robert Metzger <rm...@apache.org> wrote:

> Can you check if the jar you are submitting to the cluster contains a
> different Guava than you use at compile time?
>
> Also, it might happen that Guava is in your classpath, for example one
> some YARN setups.
>
> The last resort to resolve these issues is to use the maven-shade-plugin
> and relocated the guava version you need into your own namespace.
>
> On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <
> janardhan.reddy@olacabs.com> wrote:
>
>> #1 is thrown from user code.
>>
>> We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I
>> think the hadoop's gauva is getting picked up instead of ours
>>
>> On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Hi Janardhan,
>>>
>>> #1 Is the exception thrown from your user code, or from Flink?
>>>
>>> #2 is most likely caused due to a compiler / runtime version mismatch:
>>> http://stackoverflow.com/questions/10382929/how-to
>>> -fix-java-lang-unsupportedclassversionerror-unsupported-majo
>>> r-minor-versi
>>> You compiled the code with Java8, but you try to run it with an older
>>> JVM.
>>>
>>> On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <
>>> janardhan.reddy@olacabs.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are getting the following error on submitting the flink jobs to the
>>>> cluster.
>>>>
>>>> 1. Caused by: java.lang.NoSuchMethodError: com.google.common.io
>>>> .Resources.asCharSource
>>>>
>>>> 2. This is for entirely different job
>>>> Caused by: java.lang.UnsupportedClassVersionError:
>>>> com/olacabs/fabric/common/Metadata : Unsupported major.minor version
>>>> 52.0
>>>>
>>>> But when we are running the flink locally, there is no error in both
>>>> the jobs.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: flink no class found error

Posted by Robert Metzger <rm...@apache.org>.
Can you check if the jar you are submitting to the cluster contains a
different Guava than you use at compile time?

Also, it might happen that Guava is in your classpath, for example one some
YARN setups.

The last resort to resolve these issues is to use the maven-shade-plugin
and relocated the guava version you need into your own namespace.

On Wed, Aug 10, 2016 at 9:56 PM, Janardhan Reddy <
janardhan.reddy@olacabs.com> wrote:

> #1 is thrown from user code.
>
> We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I
> think the hadoop's gauva is getting picked up instead of ours
>
> On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi Janardhan,
>>
>> #1 Is the exception thrown from your user code, or from Flink?
>>
>> #2 is most likely caused due to a compiler / runtime version mismatch:
>> http://stackoverflow.com/questions/10382929/how-
>> to-fix-java-lang-unsupportedclassversionerror-unsupported-
>> major-minor-versi
>> You compiled the code with Java8, but you try to run it with an older JVM.
>>
>> On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <
>> janardhan.reddy@olacabs.com> wrote:
>>
>>> Hi,
>>>
>>> We are getting the following error on submitting the flink jobs to the
>>> cluster.
>>>
>>> 1. Caused by: java.lang.NoSuchMethodError: com.google.common.io
>>> .Resources.asCharSource
>>>
>>> 2. This is for entirely different job
>>> Caused by: java.lang.UnsupportedClassVersionError:
>>> com/olacabs/fabric/common/Metadata : Unsupported major.minor version
>>> 52.0
>>>
>>> But when we are running the flink locally, there is no error in both the
>>> jobs.
>>>
>>>
>>>
>>>
>>>
>>
>

Re: flink no class found error

Posted by Janardhan Reddy <ja...@olacabs.com>.
#1 is thrown from user code.

We use hadoop 2.7 which uses gauva 11.2 but our application uses 18.0. I
think the hadoop's gauva is getting picked up instead of ours

On Thu, Aug 11, 2016 at 1:24 AM, Robert Metzger <rm...@apache.org> wrote:

> Hi Janardhan,
>
> #1 Is the exception thrown from your user code, or from Flink?
>
> #2 is most likely caused due to a compiler / runtime version mismatch:
> http://stackoverflow.com/questions/10382929/how-to-fix-java-lang-
> unsupportedclassversionerror-unsupported-major-minor-versi
> You compiled the code with Java8, but you try to run it with an older JVM.
>
> On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <
> janardhan.reddy@olacabs.com> wrote:
>
>> Hi,
>>
>> We are getting the following error on submitting the flink jobs to the
>> cluster.
>>
>> 1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.Resources
>> .asCharSource
>>
>> 2. This is for entirely different job
>> Caused by: java.lang.UnsupportedClassVersionError:
>> com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0
>>
>> But when we are running the flink locally, there is no error in both the
>> jobs.
>>
>>
>>
>>
>>
>

Re: flink no class found error

Posted by Robert Metzger <rm...@apache.org>.
Hi Janardhan,

#1 Is the exception thrown from your user code, or from Flink?

#2 is most likely caused due to a compiler / runtime version mismatch:
http://stackoverflow.com/questions/10382929/how-to-fix-java-lang-unsupportedclassversionerror-unsupported-major-minor-versi
You compiled the code with Java8, but you try to run it with an older JVM.

On Wed, Aug 10, 2016 at 9:46 PM, Janardhan Reddy <
janardhan.reddy@olacabs.com> wrote:

> Hi,
>
> We are getting the following error on submitting the flink jobs to the
> cluster.
>
> 1. Caused by: java.lang.NoSuchMethodError: com.google.common.io.
> Resources.asCharSource
>
> 2. This is for entirely different job
> Caused by: java.lang.UnsupportedClassVersionError:
> com/olacabs/fabric/common/Metadata : Unsupported major.minor version 52.0
>
> But when we are running the flink locally, there is no error in both the
> jobs.
>
>
>
>
>