You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niclas Hedhman <ni...@apache.org> on 2018/02/18 07:14:41 UTC

Only a single message processed

Hi,
I am pretty new to Flink, and I like what I see and have started to build
my first application using it.
I must be missing something very fundamental. I have a
FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
functions and terminated with the standard CassandraSink. I have try..catch
on all my own maps/filters and the first message in the queue is processed
after start-up, but any additional messages are ignore, i.e. not reaching
the first map(). Any additional messages are swallowed (i.e. consumed but
not forwarded).

I suspect that this is some type of de-duplication going on, since the
(test) producer of these messages. The producer provide different values on
each, but there is no "key" being passed to the KafkaProducer.

Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer
to ingest all messages, and not try to de-duplicate them?

Thanks

--
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java

Re: Only a single message processed

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Niclas,

Glad that you got it working!
Thanks for sharing the problem and solution.

Best, Fabian

2018-02-19 9:29 GMT+01:00 Niclas Hedhman <ni...@apache.org>:

>
> (Sorry for the incoherent order and ramblings. I am writing this as I am
> trying to sort out what is going on...)
>
> 1. It is the first message to be processed in the Kafka topic. If I set
> the offset manually, it will pick up the message at that point, process it,
> and ignore all following messages.
>
> 2. Yes, the Kafka console consumer tool is spitting out the messages
> without problem. Btw, they are plain Strings, well, serialized JSON objects.
>
> 3. Code is a bit messy, but I have copied out the relevant parts below.
>
> I also noticed that a LOT of exceptions are thrown ("breakpoint on any
> exception"), mostly ClassCastException, classes not found and
> NoSuchMethodException, but nothing that bubbles up out of the internals. Is
> this part of Scala raping the JVM, or just the normal JVM class loading
> sequence (no wonder it is so slow)? Is that expected?
>
> I have tried to use both the ObjectMapper from Jackson proper, as well as
> the shadowed ObjectMapper in flink. No difference.
>
> Recap; Positioning Kafka consumer to message 8th from the last. Only that
> message is consumed, the remaining 7 are ignored/swallowed.
>
>
> Ok, so I think I have traced this down to something happening in the
> CassandraSink. There is a Exception being thrown somewhere, which I see as
> the Kafka09Fetcher.runFetchLoop()'s finally clause is called.
>
> Found it (hours later in debugging), on this line (Flink 1.4.1)
>
> org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258
>
> which contains
>
>     future.addListener(callbackListener, executor);  // IDEA says 'future' is of type DefaultResultSetFuture
>
> throws an Exception without stepping into the addListener() method. There
> is nothing catching the Exception (and I don't want to go down the rabbit
> hole of building from source), so I can't really say what Exception is
> being thrown. IDEA doesn't seem to report it, and the catch clauses in
> OperatorChain.pushToOperator() (ClassCastException and Exception) are in
> the call stack, but doesn't catch it, which could suggest an
> java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO
> MANY classloading exception going on all the time.
>
> Hold on a second... There are TWO com.datastax.driver.core.DefaultResultSetFuture
> types in the classpath. One from the Cassandra client that I declared, and
> on from inside the flink-connector-cassandra_2.11 artifact...
>
> So will it work if I remove my own dependency declaration and that's it?
>
>
> YEEEEESSSSS!!! FInally.....
>
>
> SOLVED!
>
> -o-o-o-o-o-
>
> public static void main( String[] args )
>     throws Exception
> {
>     cli = CommandLine.populateCommand( new ServerCliOptions(), args );
>     initializeCassandra( cli );
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism( 32768 );
> //    createPollDocPipeline( env );
>     createAdminPipeline( env );
>     env.execute( "schedule.poll" );
> }
>
>
>
> private static void createAdminPipeline( StreamExecutionEnvironment env )
> {
>     try
>     {
>         FlinkKafkaConsumer011<String> adminSource = createKafkaAdminSource();
>         SplitStream<AdminCommand> adminStream =
>             env.addSource( adminSource )
>                .name( "scheduler.admin" )
>                .map( value -> {
>                    try
>                    {
>                        return mapper.readValue( value, AdminCommand.class );
>                    }
>                    catch( Throwable e )
>                    {
>                        LOG.error( "Unexpected error deserializing AdminCommand", e );
>                        return null;
>                    }
>                } )
>                .name( "admin.command.read" )
>                .split( value -> singletonList( value.action() ) );
>
>         SingleOutputStreamOperator<Tuple3<List<String>, String, String>> insertStream =
>             adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
>                        .map( new GetPollDeclaration() )
>                        .name( "scheduler.admin.insert" )
>                        .map( new PollDeclarationToTuple3Map() )
>                        .name( "scheduler.pollDeclToTuple3" )
>                        .filter( tuple -> tuple != null );
>
>         SingleOutputStreamOperator<Tuple3<List<String>, String, String>> deleteStream =
>             adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
>                        .map( new GetPollDeclaration() )
>                        .name( "scheduler.admin.delete" )
>                        .map( new PollDeclarationToTuple3Map() )
>                        .name( "scheduler.pollDeclToTuple3" )
>                        .filter( tuple -> tuple != null );
>
>         CassandraSink.addSink( insertStream )
>                      .setHost( cli.primaryCassandraHost(), cli.primaryCassandraPort() )
>                      .setQuery( String.format( INSERT_SCHEDULE, cli.cassandraKeyspace ) )
>                      .build();
>
>         CassandraSink.addSink( deleteStream )
>                      .setHost( cli.primaryCassandraHost(), cli.primaryCassandraPort() )
>                      .setQuery( String.format( DELETE_SCHEDULE, cli.cassandraKeyspace ) )
>                      .build();
>     }
>     catch( Throwable e )
>     {
>         String message = "Unable to start Scheduling Admin";
>         LOG.error( message );
>         throw new RuntimeException( message, e );
>     }
> }
>
>
> private static class GetPollDeclaration
>     implements MapFunction<AdminCommand, PollDeclaration>
> {
>     private static final Logger LOG = LoggerFactory.getLogger( GetPollDeclaration.class );
>
>     @Override
>     public PollDeclaration map( AdminCommand command )
>         throws Exception
>     {
>         try
>         {
>             if( command == null )
>             {
>                 return null;
>             }
>             return (PollDeclaration) command.value();
>         }
>         catch( Throwable e )
>         {
>             LOG.error( "Unable to cast command data to PollDeclaration", e );
>             return null;
>         }
>     }
> }
>
>
> private static class PollDeclarationToTuple3Map
>     implements MapFunction<PollDeclaration, Tuple3<List<String>, String, String>>
> {
>     @Override
>     public Tuple3<List<String>, String, String> map( PollDeclaration decl )
>         throws Exception
>     {
>         try
>         {
>             if( decl == null )
>             {
>                 return null;
>             }
>             return new Tuple3<>( singletonList( mapper.writeValueAsString( decl ) ), decl.zoneId + ":" + decl.schedule, decl.url );
>         }
>         catch( Throwable e )
>         {
>             LOG.error( "Unable to cast command data to PollDeclaration", e );
>             return null;
>         }
>     }
> }
>
> Flink Dependencies;
>
> flink         : [
>         [group: "org.apache.flink", name: "flink-core", version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-java", version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-connector-cassandra_2.11", version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-connector-kafka-0.11_2.11", version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-queryable-state-runtime_2.11", version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-streaming-java_2.11", version: flinkVersion],
>         [group: "org.apache.flink", name: "flink-streaming-scala_2.11", version: flinkVersion]
> ],
>
>
>
>
>
> On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <xi...@gmail.com> wrote:
>
>> Hi Niclas,
>>
>> About the second point you mentioned, was the processed message a random
>> one or a fixed one?
>>
>> The default startup mode for FlinkKafkaConsumer is
>> StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while
>> debugging. Also, before that, you may try fetching the messages with the
>> Kafka console consumer tool to see whether they can be consumed completely.
>>
>> Besides, I wonder if you could provide the code for you Flink pipeline.
>> That’ll be helpful.
>>
>> Best,
>> Xingcan
>>
>>
>>
>> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <ni...@apache.org> wrote:
>>
>>
>> So, the producer is run (at the moment) manually (command-line) one
>> message at a time.
>> Kafka's tooling (different consumer group) shows that a message is added
>> each time.
>>
>> Since my last post, I have also added a UUID as the key, and that didn't
>> make a difference, so you are likely correct about de-dup.
>>
>>
>> There is only a single partition on the topic, so it shouldn't be a
>> partitioning issue.
>>
>> I also noticed;
>> 1. Sending a message while consumer topology is running, after the first
>> message, then that message will be processed after a restart.
>>
>> 2. Sending many messages, while consumer is running, and then doing many
>> restarts will only process a single of those. No idea what happens to the
>> others.
>>
>> I am utterly confused.
>>
>> And digging in the internals are not for the faint-hearted, but the
>> kafka.poll() returns frequently with empty records.
>>
>> Will continue debugging that tomorrow...
>>
>>
>> Niclas
>>
>> On Feb 18, 2018 18:50, "Fabian Hueske" <fh...@gmail.com> wrote:
>>
>>> Hi Niclas,
>>>
>>> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
>>> "feature" is not implemented.
>>> Do you produce into the topic that you want to read or is the data in
>>> the topic static?
>>> If you do not produce in the topic while the consuming application is
>>> running, this might be an issue with the start position of the consumer
>>> [1].
>>>
>>> Best, Fabian
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>
>>> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <ni...@apache.org>:
>>>
>>>> Hi,
>>>> I am pretty new to Flink, and I like what I see and have started to
>>>> build my first application using it.
>>>> I must be missing something very fundamental. I have a
>>>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
>>>> functions and terminated with the standard CassandraSink. I have try..catch
>>>> on all my own maps/filters and the first message in the queue is processed
>>>> after start-up, but any additional messages are ignore, i.e. not reaching
>>>> the first map(). Any additional messages are swallowed (i.e. consumed but
>>>> not forwarded).
>>>>
>>>> I suspect that this is some type of de-duplication going on, since the
>>>> (test) producer of these messages. The producer provide different values on
>>>> each, but there is no "key" being passed to the KafkaProducer.
>>>>
>>>> Is that required? And if so, why? Can I tell Flink or Flink's
>>>> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>>>>
>>>> Thanks
>>>>
>>>> --
>>>> Niclas Hedhman, Software Developer
>>>> http://zest.apache.org - New Energy for Java
>>>>
>>>
>>>
>>
>
>
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org - New Energy for Java
>

Re: Only a single message processed

Posted by Niclas Hedhman <ni...@apache.org>.
(Sorry for the incoherent order and ramblings. I am writing this as I am
trying to sort out what is going on...)

1. It is the first message to be processed in the Kafka topic. If I set the
offset manually, it will pick up the message at that point, process it, and
ignore all following messages.

2. Yes, the Kafka console consumer tool is spitting out the messages
without problem. Btw, they are plain Strings, well, serialized JSON objects.

3. Code is a bit messy, but I have copied out the relevant parts below.

I also noticed that a LOT of exceptions are thrown ("breakpoint on any
exception"), mostly ClassCastException, classes not found and
NoSuchMethodException, but nothing that bubbles up out of the internals. Is
this part of Scala raping the JVM, or just the normal JVM class loading
sequence (no wonder it is so slow)? Is that expected?

I have tried to use both the ObjectMapper from Jackson proper, as well as
the shadowed ObjectMapper in flink. No difference.

Recap; Positioning Kafka consumer to message 8th from the last. Only that
message is consumed, the remaining 7 are ignored/swallowed.


Ok, so I think I have traced this down to something happening in the
CassandraSink. There is a Exception being thrown somewhere, which I see as
the Kafka09Fetcher.runFetchLoop()'s finally clause is called.

Found it (hours later in debugging), on this line (Flink 1.4.1)

org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258

which contains

    future.addListener(callbackListener, executor);  // IDEA says
'future' is of type DefaultResultSetFuture

throws an Exception without stepping into the addListener() method. There
is nothing catching the Exception (and I don't want to go down the rabbit
hole of building from source), so I can't really say what Exception is
being thrown. IDEA doesn't seem to report it, and the catch clauses in
OperatorChain.pushToOperator() (ClassCastException and Exception) are in
the call stack, but doesn't catch it, which could suggest an
java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO
MANY classloading exception going on all the time.

Hold on a second... There are TWO
com.datastax.driver.core.DefaultResultSetFuture types in the classpath. One
from the Cassandra client that I declared, and on from inside the
flink-connector-cassandra_2.11 artifact...

So will it work if I remove my own dependency declaration and that's it?


YEEEEESSSSS!!! FInally.....


SOLVED!

-o-o-o-o-o-

public static void main( String[] args )
    throws Exception
{
    cli = CommandLine.populateCommand( new ServerCliOptions(), args );
    initializeCassandra( cli );
    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism(
32768 );
//    createPollDocPipeline( env );
    createAdminPipeline( env );
    env.execute( "schedule.poll" );
}



private static void createAdminPipeline( StreamExecutionEnvironment env )
{
    try
    {
        FlinkKafkaConsumer011<String> adminSource = createKafkaAdminSource();
        SplitStream<AdminCommand> adminStream =
            env.addSource( adminSource )
               .name( "scheduler.admin" )
               .map( value -> {
                   try
                   {
                       return mapper.readValue( value, AdminCommand.class );
                   }
                   catch( Throwable e )
                   {
                       LOG.error( "Unexpected error deserializing
AdminCommand", e );
                       return null;
                   }
               } )
               .name( "admin.command.read" )
               .split( value -> singletonList( value.action() ) );

        SingleOutputStreamOperator<Tuple3<List<String>, String,
String>> insertStream =
            adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
                       .map( new GetPollDeclaration() )
                       .name( "scheduler.admin.insert" )
                       .map( new PollDeclarationToTuple3Map() )
                       .name( "scheduler.pollDeclToTuple3" )
                       .filter( tuple -> tuple != null );

        SingleOutputStreamOperator<Tuple3<List<String>, String,
String>> deleteStream =
            adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
                       .map( new GetPollDeclaration() )
                       .name( "scheduler.admin.delete" )
                       .map( new PollDeclarationToTuple3Map() )
                       .name( "scheduler.pollDeclToTuple3" )
                       .filter( tuple -> tuple != null );

        CassandraSink.addSink( insertStream )
                     .setHost( cli.primaryCassandraHost(),
cli.primaryCassandraPort() )
                     .setQuery( String.format( INSERT_SCHEDULE,
cli.cassandraKeyspace ) )
                     .build();

        CassandraSink.addSink( deleteStream )
                     .setHost( cli.primaryCassandraHost(),
cli.primaryCassandraPort() )
                     .setQuery( String.format( DELETE_SCHEDULE,
cli.cassandraKeyspace ) )
                     .build();
    }
    catch( Throwable e )
    {
        String message = "Unable to start Scheduling Admin";
        LOG.error( message );
        throw new RuntimeException( message, e );
    }
}


private static class GetPollDeclaration
    implements MapFunction<AdminCommand, PollDeclaration>
{
    private static final Logger LOG = LoggerFactory.getLogger(
GetPollDeclaration.class );

    @Override
    public PollDeclaration map( AdminCommand command )
        throws Exception
    {
        try
        {
            if( command == null )
            {
                return null;
            }
            return (PollDeclaration) command.value();
        }
        catch( Throwable e )
        {
            LOG.error( "Unable to cast command data to PollDeclaration", e );
            return null;
        }
    }
}


private static class PollDeclarationToTuple3Map
    implements MapFunction<PollDeclaration, Tuple3<List<String>,
String, String>>
{
    @Override
    public Tuple3<List<String>, String, String> map( PollDeclaration decl )
        throws Exception
    {
        try
        {
            if( decl == null )
            {
                return null;
            }
            return new Tuple3<>( singletonList(
mapper.writeValueAsString( decl ) ), decl.zoneId + ":" +
decl.schedule, decl.url );
        }
        catch( Throwable e )
        {
            LOG.error( "Unable to cast command data to PollDeclaration", e );
            return null;
        }
    }
}

Flink Dependencies;

flink         : [
        [group: "org.apache.flink", name: "flink-core", version: flinkVersion],
        [group: "org.apache.flink", name: "flink-java", version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-connector-cassandra_2.11", version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-connector-kafka-0.11_2.11", version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-queryable-state-runtime_2.11", version: flinkVersion],
        [group: "org.apache.flink", name: "flink-streaming-java_2.11",
version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-streaming-scala_2.11", version: flinkVersion]
],





On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <xi...@gmail.com> wrote:

> Hi Niclas,
>
> About the second point you mentioned, was the processed message a random
> one or a fixed one?
>
> The default startup mode for FlinkKafkaConsumer is
> StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while
> debugging. Also, before that, you may try fetching the messages with the
> Kafka console consumer tool to see whether they can be consumed completely.
>
> Besides, I wonder if you could provide the code for you Flink pipeline.
> That’ll be helpful.
>
> Best,
> Xingcan
>
>
>
> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <ni...@apache.org> wrote:
>
>
> So, the producer is run (at the moment) manually (command-line) one
> message at a time.
> Kafka's tooling (different consumer group) shows that a message is added
> each time.
>
> Since my last post, I have also added a UUID as the key, and that didn't
> make a difference, so you are likely correct about de-dup.
>
>
> There is only a single partition on the topic, so it shouldn't be a
> partitioning issue.
>
> I also noticed;
> 1. Sending a message while consumer topology is running, after the first
> message, then that message will be processed after a restart.
>
> 2. Sending many messages, while consumer is running, and then doing many
> restarts will only process a single of those. No idea what happens to the
> others.
>
> I am utterly confused.
>
> And digging in the internals are not for the faint-hearted, but the
> kafka.poll() returns frequently with empty records.
>
> Will continue debugging that tomorrow...
>
>
> Niclas
>
> On Feb 18, 2018 18:50, "Fabian Hueske" <fh...@gmail.com> wrote:
>
>> Hi Niclas,
>>
>> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
>> "feature" is not implemented.
>> Do you produce into the topic that you want to read or is the data in the
>> topic static?
>> If you do not produce in the topic while the consuming application is
>> running, this might be an issue with the start position of the consumer
>> [1].
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>
>> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <ni...@apache.org>:
>>
>>> Hi,
>>> I am pretty new to Flink, and I like what I see and have started to
>>> build my first application using it.
>>> I must be missing something very fundamental. I have a
>>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
>>> functions and terminated with the standard CassandraSink. I have try..catch
>>> on all my own maps/filters and the first message in the queue is processed
>>> after start-up, but any additional messages are ignore, i.e. not reaching
>>> the first map(). Any additional messages are swallowed (i.e. consumed but
>>> not forwarded).
>>>
>>> I suspect that this is some type of de-duplication going on, since the
>>> (test) producer of these messages. The producer provide different values on
>>> each, but there is no "key" being passed to the KafkaProducer.
>>>
>>> Is that required? And if so, why? Can I tell Flink or Flink's
>>> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>>>
>>> Thanks
>>>
>>> --
>>> Niclas Hedhman, Software Developer
>>> http://zest.apache.org - New Energy for Java
>>>
>>
>>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java

Re: Only a single message processed

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Niclas,

About the second point you mentioned, was the processed message a random one or a fixed one? 

The default startup mode for FlinkKafkaConsumer is StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while debugging. Also, before that, you may try fetching the messages with the Kafka console consumer tool to see whether they can be consumed completely.

Besides, I wonder if you could provide the code for you Flink pipeline. That’ll be helpful.

Best,
Xingcan



> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <ni...@apache.org> wrote:
> 
> 
> So, the producer is run (at the moment) manually (command-line) one message at a time.
> Kafka's tooling (different consumer group) shows that a message is added each time.
> 
> Since my last post, I have also added a UUID as the key, and that didn't make a difference, so you are likely correct about de-dup.
> 
> 
> There is only a single partition on the topic, so it shouldn't be a partitioning issue.
> 
> I also noticed;
> 1. Sending a message while consumer topology is running, after the first message, then that message will be processed after a restart.
> 
> 2. Sending many messages, while consumer is running, and then doing many restarts will only process a single of those. No idea what happens to the others.
> 
> I am utterly confused.
> 
> And digging in the internals are not for the faint-hearted, but the kafka.poll() returns frequently with empty records.
> 
> Will continue debugging that tomorrow...
> 
> 
> Niclas
> 
> On Feb 18, 2018 18:50, "Fabian Hueske" <fhueske@gmail.com <ma...@gmail.com>> wrote:
> Hi Niclas,
> 
> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a "feature" is not implemented.
> Do you produce into the topic that you want to read or is the data in the topic static?
> If you do not produce in the topic while the consuming application is running, this might be an issue with the start position of the consumer [1]. 
> 
> Best, Fabian
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration>
> 
> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <niclas@apache.org <ma...@apache.org>>:
> Hi,
> I am pretty new to Flink, and I like what I see and have started to build my first application using it.
> I must be missing something very fundamental. I have a FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap functions and terminated with the standard CassandraSink. I have try..catch on all my own maps/filters and the first message in the queue is processed after start-up, but any additional messages are ignore, i.e. not reaching the first map(). Any additional messages are swallowed (i.e. consumed but not forwarded).
> 
> I suspect that this is some type of de-duplication going on, since the (test) producer of these messages. The producer provide different values on each, but there is no "key" being passed to the KafkaProducer.
> 
> Is that required? And if so, why? Can I tell Flink or Flink's KafkaConsumer to ingest all messages, and not try to de-duplicate them?
> 
> Thanks
> 
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org <http://zest.apache.org/> - New Energy for Java
> 


Re: Only a single message processed

Posted by Niclas Hedhman <ni...@apache.org>.
So, the producer is run (at the moment) manually (command-line) one message
at a time.
Kafka's tooling (different consumer group) shows that a message is added
each time.

Since my last post, I have also added a UUID as the key, and that didn't
make a difference, so you are likely correct about de-dup.


There is only a single partition on the topic, so it shouldn't be a
partitioning issue.

I also noticed;
1. Sending a message while consumer topology is running, after the first
message, then that message will be processed after a restart.

2. Sending many messages, while consumer is running, and then doing many
restarts will only process a single of those. No idea what happens to the
others.

I am utterly confused.

And digging in the internals are not for the faint-hearted, but the
kafka.poll() returns frequently with empty records.

Will continue debugging that tomorrow...


Niclas

On Feb 18, 2018 18:50, "Fabian Hueske" <fh...@gmail.com> wrote:

> Hi Niclas,
>
> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
> "feature" is not implemented.
> Do you produce into the topic that you want to read or is the data in the
> topic static?
> If you do not produce in the topic while the consuming application is
> running, this might be an issue with the start position of the consumer
> [1].
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html#kafka-consumers-
> start-position-configuration
>
> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <ni...@apache.org>:
>
>> Hi,
>> I am pretty new to Flink, and I like what I see and have started to build
>> my first application using it.
>> I must be missing something very fundamental. I have a
>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
>> functions and terminated with the standard CassandraSink. I have try..catch
>> on all my own maps/filters and the first message in the queue is processed
>> after start-up, but any additional messages are ignore, i.e. not reaching
>> the first map(). Any additional messages are swallowed (i.e. consumed but
>> not forwarded).
>>
>> I suspect that this is some type of de-duplication going on, since the
>> (test) producer of these messages. The producer provide different values on
>> each, but there is no "key" being passed to the KafkaProducer.
>>
>> Is that required? And if so, why? Can I tell Flink or Flink's
>> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>>
>> Thanks
>>
>> --
>> Niclas Hedhman, Software Developer
>> http://zest.apache.org - New Energy for Java
>>
>
>

Re: Only a single message processed

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Niclas,

Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
"feature" is not implemented.
Do you produce into the topic that you want to read or is the data in the
topic static?
If you do not produce in the topic while the consuming application is
running, this might be an issue with the start position of the consumer
[1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

2018-02-18 8:14 GMT+01:00 Niclas Hedhman <ni...@apache.org>:

> Hi,
> I am pretty new to Flink, and I like what I see and have started to build
> my first application using it.
> I must be missing something very fundamental. I have a
> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
> functions and terminated with the standard CassandraSink. I have try..catch
> on all my own maps/filters and the first message in the queue is processed
> after start-up, but any additional messages are ignore, i.e. not reaching
> the first map(). Any additional messages are swallowed (i.e. consumed but
> not forwarded).
>
> I suspect that this is some type of de-duplication going on, since the
> (test) producer of these messages. The producer provide different values on
> each, but there is no "key" being passed to the KafkaProducer.
>
> Is that required? And if so, why? Can I tell Flink or Flink's
> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>
> Thanks
>
> --
> Niclas Hedhman, Software Developer
> http://zest.apache.org - New Energy for Java
>