You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Andrea Cosentino <an...@gmail.com> on 2021/01/04 22:18:54 UTC

Re: Camel S3SourceConnector and Effect of Topic cleanup.policy "compact" vs. "delete"

Is this with aws2-s3 connector or aws2-s3?

Il lun 4 gen 2021, 23:05 Arundhati Bhende <ar...@prudential.com>
ha scritto:

> Hi, I am testing the connector with different cleanup policies for the
> Topic.
>
> If the topic cleanup.policy is set to "delete",  the connector works
> correctly and I am able to access the message in the topic
>
> If the topic cleanup.policy is set to "compact", the connect Task fails
> with the below error.
>
> I am trying to find out why this happens.  Can someone please explain?
>
>       trace: org.apache.kafka.connect.errors.ConnectException:
> OffsetStorageWriter is already flushing
>           at
> org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
>           at
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
>           at
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
>           at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
>           at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
>           at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>           at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>           at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>           at java.lang.Thread.run(Thread.java:748)
>
> Thank you
>
>
>

Re: Camel S3SourceConnector and Effect of Topic cleanup.policy "compact" vs. "delete"

Posted by Arundhati Bhende <ar...@prudential.com>.
Thank you.  And I do not need to use compact, but it so happened that the topic I was given to use for the POC was originally defined as compact and when I was getting many different errors which can now be attributed to what I was doing with the connector configuration that was wrong and I decided to create a new topic, but I used the same script that we had for creating topics, just changed the name of the topic.

Out of curiosity, I will try to explore the broker configuration, but otherwise my initial POC is done __

Thanks again

On 1/5/21, 11:04 AM, "Andrea Cosentino" <an...@yahoo.com> wrote:

    The options I reported are related to kafka broker configuration I guess, so they should be set at broker level and not in the connector config.

    By the way, without more information on your configuration and why you need compaction this is not so much we can do.

    --
    Andrea Cosentino 
    ----------------------------------
    Apache Camel PMC Chair
    Apache Karaf Committer
    Apache Servicemix PMC Member
    Email: ancosen1985@yahoo.com
    Twitter: @oscerd2
    Github: oscerd






    On Tuesday, January 5, 2021, 04:59:56 PM GMT+1, Arundhati Bhende <ar...@prudential.com> wrote: 





    Thanks.  I tried with those options with many combinations,  but kept getting same error.  Asking this to get better understanding.

    So, I used the same connector configuration as below.    I created the topic with cleanup.policy=compact and kept getting the error below, so I changed "only" the cleanup policy to "delete" and it worked.  Other configuration parameters for the topic were kept exactly the same.    So, trying to understand the reason for why the topic must be cleanup.policy = delete. 

    DATA=$( cat << EOF
    {
            "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.storage.StringConverter",
            "camel.source.maxPollDuration": "10000",
            "topics": "TEST-S3-SOURCE-DZONE-POC",
            "camel.source.path.bucketNameOrArn": " push-json-poc",
            "camel.component.aws-s3.region": "US_EAST_1",
            "tasks.max": "1",
            "camel.source.endpoint.useIAMCredentials": "true",
            "camel.source.endpoint.autocloseBody": "true"
    }
    EOF
    )

    Thanks



    On 1/5/21, 3:08 AM, "Andrea Cosentino" <an...@yahoo.com.INVALID> wrote:

        This seems related more on kafka connect configuration than the connector itself. I guess you'll need to tune the options related to this like:

        offset.flush.timeout.ms
        offset.flush.interval.ms

        --
        Andrea Cosentino 
        ----------------------------------
        Apache Camel PMC Chair
        Apache Karaf Committer
        Apache Servicemix PMC Member
        Email: ancosen1985@yahoo.com
        Twitter: @oscerd2
        Github: oscerd






        On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende <ar...@prudential.com> wrote: 





        aws-s3 connector  -  not aws2-s3.  

        On 1/4/21, 5:19 PM, "Andrea Cosentino" <an...@gmail.com> wrote:

            Is this with aws2-s3 connector or aws2-s3?

            Il lun 4 gen 2021, 23:05 Arundhati Bhende <ar...@prudential.com>
            ha scritto:

            > Hi, I am testing the connector with different cleanup policies for the
            > Topic.
            >
            > If the topic cleanup.policy is set to "delete",  the connector works
            > correctly and I am able to access the message in the topic
            >
            > If the topic cleanup.policy is set to "compact", the connect Task fails
            > with the below error.
            >
            > I am trying to find out why this happens.  Can someone please explain?
            >
            >      trace: org.apache.kafka.connect.errors.ConnectException:
            > OffsetStorageWriter is already flushing
            >          at
            > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
            >          at
            > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
            >          at
            > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
            >          at
            > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
            >          at
            > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
            >          at
            > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            >          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            >          at
            > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            >          at
            > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            >          at java.lang.Thread.run(Thread.java:748)
            >
            > Thank you
            >
            >
            >




Re: Camel S3SourceConnector and Effect of Topic cleanup.policy "compact" vs. "delete"

Posted by Andrea Cosentino <an...@yahoo.com.INVALID>.
The options I reported are related to kafka broker configuration I guess, so they should be set at broker level and not in the connector config.

By the way, without more information on your configuration and why you need compaction this is not so much we can do.

--
Andrea Cosentino 
----------------------------------
Apache Camel PMC Chair
Apache Karaf Committer
Apache Servicemix PMC Member
Email: ancosen1985@yahoo.com
Twitter: @oscerd2
Github: oscerd






On Tuesday, January 5, 2021, 04:59:56 PM GMT+1, Arundhati Bhende <ar...@prudential.com> wrote: 





Thanks.  I tried with those options with many combinations,  but kept getting same error.  Asking this to get better understanding.

So, I used the same connector configuration as below.    I created the topic with cleanup.policy=compact and kept getting the error below, so I changed "only" the cleanup policy to "delete" and it worked.  Other configuration parameters for the topic were kept exactly the same.    So, trying to understand the reason for why the topic must be cleanup.policy = delete. 

DATA=$( cat << EOF
{
        "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "camel.source.maxPollDuration": "10000",
        "topics": "TEST-S3-SOURCE-DZONE-POC",
        "camel.source.path.bucketNameOrArn": " push-json-poc",
        "camel.component.aws-s3.region": "US_EAST_1",
        "tasks.max": "1",
        "camel.source.endpoint.useIAMCredentials": "true",
        "camel.source.endpoint.autocloseBody": "true"
}
EOF
)

Thanks



On 1/5/21, 3:08 AM, "Andrea Cosentino" <an...@yahoo.com.INVALID> wrote:

    This seems related more on kafka connect configuration than the connector itself. I guess you'll need to tune the options related to this like:

    offset.flush.timeout.ms
    offset.flush.interval.ms

    --
    Andrea Cosentino 
    ----------------------------------
    Apache Camel PMC Chair
    Apache Karaf Committer
    Apache Servicemix PMC Member
    Email: ancosen1985@yahoo.com
    Twitter: @oscerd2
    Github: oscerd






    On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende <ar...@prudential.com> wrote: 





    aws-s3 connector  -  not aws2-s3.  

    On 1/4/21, 5:19 PM, "Andrea Cosentino" <an...@gmail.com> wrote:

        Is this with aws2-s3 connector or aws2-s3?

        Il lun 4 gen 2021, 23:05 Arundhati Bhende <ar...@prudential.com>
        ha scritto:

        > Hi, I am testing the connector with different cleanup policies for the
        > Topic.
        >
        > If the topic cleanup.policy is set to "delete",  the connector works
        > correctly and I am able to access the message in the topic
        >
        > If the topic cleanup.policy is set to "compact", the connect Task fails
        > with the below error.
        >
        > I am trying to find out why this happens.  Can someone please explain?
        >
        >      trace: org.apache.kafka.connect.errors.ConnectException:
        > OffsetStorageWriter is already flushing
        >          at
        > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
        >          at
        > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
        >          at
        > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
        >          at
        > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        >          at
        > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        >          at
        > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        >          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        >          at
        > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        >          at
        > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        >          at java.lang.Thread.run(Thread.java:748)
        >
        > Thank you
        >
        >
        >



Re: Camel S3SourceConnector and Effect of Topic cleanup.policy "compact" vs. "delete"

Posted by Arundhati Bhende <ar...@prudential.com>.
Thanks.  I tried with those options with many combinations,  but kept getting same error.  Asking this to get better understanding.

So, I used the same connector configuration as below.     I created the topic with cleanup.policy=compact and kept getting the error below, so I changed "only" the cleanup policy to "delete" and it worked.   Other configuration parameters for the topic were kept exactly the same.    So, trying to understand the reason for why the topic must be cleanup.policy = delete. 

 DATA=$( cat << EOF
{
        "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "camel.source.maxPollDuration": "10000",
        "topics": "TEST-S3-SOURCE-DZONE-POC",
        "camel.source.path.bucketNameOrArn": " push-json-poc",
        "camel.component.aws-s3.region": "US_EAST_1",
        "tasks.max": "1",
        "camel.source.endpoint.useIAMCredentials": "true",
        "camel.source.endpoint.autocloseBody": "true"
}
EOF
)

Thanks



On 1/5/21, 3:08 AM, "Andrea Cosentino" <an...@yahoo.com.INVALID> wrote:

    This seems related more on kafka connect configuration than the connector itself. I guess you'll need to tune the options related to this like:

    offset.flush.timeout.ms
    offset.flush.interval.ms

    --
    Andrea Cosentino 
    ----------------------------------
    Apache Camel PMC Chair
    Apache Karaf Committer
    Apache Servicemix PMC Member
    Email: ancosen1985@yahoo.com
    Twitter: @oscerd2
    Github: oscerd






    On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende <ar...@prudential.com> wrote: 





    aws-s3 connector  -  not aws2-s3.  

    On 1/4/21, 5:19 PM, "Andrea Cosentino" <an...@gmail.com> wrote:

        Is this with aws2-s3 connector or aws2-s3?

        Il lun 4 gen 2021, 23:05 Arundhati Bhende <ar...@prudential.com>
        ha scritto:

        > Hi, I am testing the connector with different cleanup policies for the
        > Topic.
        >
        > If the topic cleanup.policy is set to "delete",  the connector works
        > correctly and I am able to access the message in the topic
        >
        > If the topic cleanup.policy is set to "compact", the connect Task fails
        > with the below error.
        >
        > I am trying to find out why this happens.  Can someone please explain?
        >
        >      trace: org.apache.kafka.connect.errors.ConnectException:
        > OffsetStorageWriter is already flushing
        >          at
        > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
        >          at
        > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
        >          at
        > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
        >          at
        > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        >          at
        > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        >          at
        > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        >          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        >          at
        > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        >          at
        > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        >          at java.lang.Thread.run(Thread.java:748)
        >
        > Thank you
        >
        >
        >



Re: Camel S3SourceConnector and Effect of Topic cleanup.policy "compact" vs. "delete"

Posted by Andrea Cosentino <an...@yahoo.com.INVALID>.
This seems related more on kafka connect configuration than the connector itself. I guess you'll need to tune the options related to this like:

offset.flush.timeout.ms
offset.flush.interval.ms

--
Andrea Cosentino 
----------------------------------
Apache Camel PMC Chair
Apache Karaf Committer
Apache Servicemix PMC Member
Email: ancosen1985@yahoo.com
Twitter: @oscerd2
Github: oscerd






On Tuesday, January 5, 2021, 12:17:44 AM GMT+1, Arundhati Bhende <ar...@prudential.com> wrote: 





aws-s3 connector  -  not aws2-s3.  

On 1/4/21, 5:19 PM, "Andrea Cosentino" <an...@gmail.com> wrote:

    Is this with aws2-s3 connector or aws2-s3?

    Il lun 4 gen 2021, 23:05 Arundhati Bhende <ar...@prudential.com>
    ha scritto:

    > Hi, I am testing the connector with different cleanup policies for the
    > Topic.
    >
    > If the topic cleanup.policy is set to "delete",  the connector works
    > correctly and I am able to access the message in the topic
    >
    > If the topic cleanup.policy is set to "compact", the connect Task fails
    > with the below error.
    >
    > I am trying to find out why this happens.  Can someone please explain?
    >
    >      trace: org.apache.kafka.connect.errors.ConnectException:
    > OffsetStorageWriter is already flushing
    >          at
    > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
    >          at
    > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
    >          at
    > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    >          at
    > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    >          at
    > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    >          at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >          at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >          at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >          at java.lang.Thread.run(Thread.java:748)
    >
    > Thank you
    >
    >
    >


Re: Camel S3SourceConnector and Effect of Topic cleanup.policy "compact" vs. "delete"

Posted by Arundhati Bhende <ar...@prudential.com>.
aws-s3 connector  -   not aws2-s3.  

On 1/4/21, 5:19 PM, "Andrea Cosentino" <an...@gmail.com> wrote:

    Is this with aws2-s3 connector or aws2-s3?

    Il lun 4 gen 2021, 23:05 Arundhati Bhende <ar...@prudential.com>
    ha scritto:

    > Hi, I am testing the connector with different cleanup policies for the
    > Topic.
    >
    > If the topic cleanup.policy is set to "delete",  the connector works
    > correctly and I am able to access the message in the topic
    >
    > If the topic cleanup.policy is set to "compact", the connect Task fails
    > with the below error.
    >
    > I am trying to find out why this happens.  Can someone please explain?
    >
    >       trace: org.apache.kafka.connect.errors.ConnectException:
    > OffsetStorageWriter is already flushing
    >           at
    > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
    >           at
    > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:438)
    >           at
    > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
    >           at
    > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    >           at
    > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    >           at
    > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    >           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    >           at
    > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    >           at
    > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    >           at java.lang.Thread.run(Thread.java:748)
    >
    > Thank you
    >
    >
    >