You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Satoshi Yamada <sa...@gmail.com> on 2023/10/06 06:59:14 UTC

How to configure camel.sink.marshal ?

Hi community,

Let me repost this cause I don't get replies in github issue here
https://github.com/apache/camel-kafka-connector/issues/1568

I'm testing camel-azure-storage-blob-sink-kafka-connector from k8s.
So far, I've confirmed CamelHeader option works
<https://github.com/apache/camel-kafka-connector/issues/1530> and the log
is uploaded to an arbitrary path.
Now I'm testing camel.sink.marshal option to save some usage on Azure Blob
Storage, but haven't been successful yet.
I've followed some configuration like zipfile for camel.sink.marshal option,
but faced the following errors.

2023-09-27 03:14:35,322 ERROR [blob-connector|task-0]
WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and
unrecoverable exception. Task is being killed and will not recover
until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and
start Camel context
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException:
org.apache.camel.VetoCamelContextStartException: Failure creating
route from template: ckcMarshal
        at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
        at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
        at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
        at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
        at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
        at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
        at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
        ... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure
creating route from template: ckcMarshal
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
        at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
        at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
        ... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException:
Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
        ... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to
create route ckcMarshal-4 at: >>>
Marshal[CustomDataFormat[{{marshal}}]] <<< in route:
Route(ckcMarshal-4)[From[kamelet://source?routeId=ckcMarshal...
because of Cannot find data format in registry with ref: zipfile
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
        at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
        at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
        at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
        ... 19 more
Caused by: java.lang.IllegalArgumentException: Cannot find data format
in registry with ref: zipfile
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:142)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
        at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
        at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
        at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
        at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
        ... 23 more
2023-09-27 03:14:35,323 INFO [blob-connector|task-0] Stopping
CamelSinkTask connector task
(org.apache.camel.kafkaconnector.CamelSinkTask)
[task-thread-blob-connector-0]


I've also tested with org.apache.camel.model.dataformat.ZipFileDataFormat,
then faced the following.

2023-09-27 03:13:40,704 ERROR [blob-connector|task-0]
WorkerSinkTask{id=blob-connector-0} Task threw an uncaught and
unrecoverable exception. Task is being killed and will not recover
until manually restarted (org.apache.kafka.connect.runtime.Work
erTask) [task-thread-blob-connector-0]
org.apache.kafka.connect.errors.ConnectException: Failed to create and
start Camel context
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:159)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:315)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException:
org.apache.camel.VetoCamelContextStartException: Failure creating
route from template: ckcMarshal
        at org.apache.camel.RuntimeCamelException.wrapRuntimeException(RuntimeCamelException.java:66)
        at org.apache.camel.support.service.BaseService.doFail(BaseService.java:413)
        at org.apache.camel.impl.engine.AbstractCamelContext.doFail(AbstractCamelContext.java:3550)
        at org.apache.camel.support.service.BaseService.fail(BaseService.java:342)
        at org.apache.camel.impl.engine.AbstractCamelContext.failOnStartup(AbstractCamelContext.java:5204)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2642)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:111)
        at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2649)
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
        at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
        at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
        at org.apache.camel.kafkaconnector.CamelSinkTask.start(CamelSinkTask.java:152)
        ... 9 more
Caused by: org.apache.camel.VetoCamelContextStartException: Failure
creating route from template: ckcMarshal
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:433)
        at org.apache.camel.impl.engine.AbstractCamelContext.doInit(AbstractCamelContext.java:2956)
        at org.apache.camel.support.service.BaseService.init(BaseService.java:83)
        at org.apache.camel.impl.engine.AbstractCamelContext.init(AbstractCamelContext.java:2630)
        ... 15 more
Caused by: org.apache.camel.component.kamelet.KameletNotFoundException:
Kamelet with id ckcMarshal not found in locations: classpath:/kamelets
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:421)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.onContextInitialized(KameletComponent.java:430)
        ... 18 more
Caused by: org.apache.camel.FailedToCreateRouteException: Failed to
create route ckcMarshal-1 at: >>>
Marshal[CustomDataFormat[{{marshal}}]] <<< in route:
Route(ckcMarshal-1)[From[kamelet://source?routeId=ckcMarshal...
because of Resolving datafor
mat: org.apache.camel.model.dataformat.ZipFileDataFormat detected type
conflict: Not a DataFormat implementation. Found:
org.apache.camel.model.dataformat.ZipFileDataFormat
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:241)
        at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:75)
        at org.apache.camel.impl.DefaultModelReifierFactory.createRoute(DefaultModelReifierFactory.java:49)
        at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:862)
        at org.apache.camel.component.kamelet.KameletComponent$LifecycleHandler.createRouteForEndpoint(KameletComponent.java:416)
        ... 19 more
Caused by: java.lang.IllegalArgumentException: Resolving dataformat:
org.apache.camel.model.dataformat.ZipFileDataFormat detected type
conflict: Not a DataFormat implementation. Found:
org.apache.camel.model.dataformat.ZipFileDataFormat
        at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormatFromResource(DefaultDataFormatResolver.java:76)
        at org.apache.camel.impl.engine.DefaultDataFormatResolver.createDataFormat(DefaultDataFormatResolver.java:47)
        at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$3(AbstractCamelContext.java:4473)
        at java.base/java.util.Optional.orElseGet(Optional.java:364)
        at org.apache.camel.impl.engine.AbstractCamelContext.lambda$resolveDataFormat$4(AbstractCamelContext.java:4473)
        at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
        at org.apache.camel.impl.engine.AbstractCamelContext.resolveDataFormat(AbstractCamelContext.java:4464)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:140)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:115)
        at org.apache.camel.reifier.dataformat.CustomDataFormatReifier.doCreateDataFormat(CustomDataFormatReifier.java:35)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.createDataFormat(DataFormatReifier.java:266)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:151)
        at org.apache.camel.reifier.dataformat.DataFormatReifier.getDataFormat(DataFormatReifier.java:111)
        at org.apache.camel.reifier.MarshalReifier.createProcessor(MarshalReifier.java:35)
        at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:847)
        at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:588)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:237)
        ... 23 more


Can I have a bit more detailed explanation on how to specify this option?
My configuration is something like below so far.

spec:
  class: org.apache.camel.kafkaconnector.azurestorageblobsink.CamelAzurestorageblobsinkSinkConnector
  config:
    camel.kamelet.azure-storage-blob-sink.accountName: "my_account"
    camel.kamelet.azure-storage-blob-sink.accessKey: "xxx"
    camel.kamelet.azure-storage-blob-sink.containerName: "my_container"
    camel.beans.aggregate:
"#class:org.apache.camel.kafkaconnector.aggregator.StringAggregator"
    camel.aggregation.size: 10
    camel.aggregation.timeout: 500000
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    topics: my-logs
    camel.sink.marshal: zipfile

I'm using camel-azure-storage-blob-sink-kafka-connector-3.18.2 and use
`uploadBlockBlob` for `CamelHeader.CamelAzureStorageBlobOperation`.

Thanks in advance,Satoshi