You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Somnath Chouwdhury <so...@datametica.com> on 2023/07/20 16:43:33 UTC

Fetch Truststore File Inside a Flex Template image for Confluent Kafka

Hi,

We are trying to store the truststore.jks file inside the Flex Template
Docker but while using it in the pipeline we are unable to locate it.

we tried pulling the image and we can see the file is present in the docker
at \tmp\trust.jks but while using the same we get the following error.


*Error Logs:*
Error message from worker: generic::unknown:
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct
kafka consumer
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:67)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:134)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/tmp/trust.jks of type JKS
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 42 more
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/tmp/trust.jks of type JKS
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:154)
... 46 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
keystore /tmp/trust.jks of type JKS
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:155)
... 49 more
Caused by: java.nio.file.NoSuchFileException: /tmp/trust.jks
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
java.base/java.nio.file.Files.newByteChannel(Files.java:371)
java.base/java.nio.file.Files.newByteChannel(Files.java:422)
java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
java.base/java.nio.file.Files.newInputStream(Files.java:156)
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
... 50 more

*Here is the docker image that we used*

FROM gcr.io/dataflow-templates-base/python38-template-launcher-base
RUN apt-get update && \
    apt-get install -y openjdk-11-jdk

ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64


ARG WORKDIR=/dataflow/python/using_flex_template_adv3
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY . .
COPY trust.jks /tmp/trust.jks

ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
ENV PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
RUN apt-get update \
    && apt-get install -y libffi-dev git \
    && rm -rf /var/lib/apt/lists/* \
    # Upgrade pip and install the requirements.
    && pip install --no-cache-dir --upgrade pip \
    && pip install --no-cache-dir -r $PYTHON_REQUIREMENTS_FILE \
    # Download the requirements to speed up launching the Dataflow job.
    && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache
-r $PYTHON_REQUIREMENTS_FILE

RUN pip install --no-cache-dir apache-beam[gcp]==2.43.0
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]



*Code Used:*

read = p_input | "Read Json from Kafka" >> ReadFromKafka(
    consumer_config=self.consumer_config,
    topics=[self.topic],
    commit_offset_in_finalize=False,
    with_metadata=False
)


*consumer_config contents:*

bootstrap.servers: bs:9095
group.id: "Group1"
ssl.truststore.location: '/tmp/trust.jks'
sasl.mechanism: SCRAM-SHA-512
security.protocol: SASL_SSL
sasl.jaas.config: ""



We are trying to run this pipeline on GCP Dataflow with apache beam python
sdk.



-- 
Thanks and Regards,

*Somnath Chouwdhury*Associate Engineer IV - Solutioning & Software
Engineering
+91 9284112897 | www.datametica.com
Datametica Solutions Private Limited, Pune - India


<https://www.linkedin.com/company/datametica>
<https://twitter.com/DataMetica>
<https://instagram.com/datametica_lifeatdm?utm_medium=copy_link>
<https://www.facebook.com/datametica1>
<https://www.youtube.com/channel/UCTBR2-f1mDSSpwD0BteETPQ>

Re: Fetch Truststore File Inside a Flex Template image for Confluent Kafka

Posted by Bruno Volpato via user <us...@beam.apache.org>.
Hi Somnath,

The problem here seems to be that you have */tmp/trust.jks* available when
creating the pipeline (in the launcher), but it apparently is not available
in the worker VMs (SDK container).

In Java we've been using JvmInitializers for Templates to copy files from
GCS when the worker starts up (see CommonTemplateJvmInitializer
<https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/common/src/main/java/com/google/cloud/teleport/v2/common/CommonTemplateJvmInitializer.java#L75-L81>).
I don't know if there's a corresponding simple way to achieve that
in Python.
More specifically for Kafka, we can use a function to build the
KafkaConsumer (
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withConsumerFactoryFn-org.apache.beam.sdk.transforms.SerializableFunction-)
<https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withConsumerFactoryFn-org.apache.beam.sdk.transforms.SerializableFunction->--
which appears to not be available in Python's interface as well.

Curious how others are solving this issue.
(It would be awesome if we could handle GCS references transparently when
using the IO, to make this much simpler)


Best,
Bruno




On Thu, Jul 20, 2023 at 12:44 PM Somnath Chouwdhury <
somnath.c@datametica.com> wrote:

> Hi,
>
> We are trying to store the truststore.jks file inside the Flex Template
> Docker but while using it in the pipeline we are unable to locate it.
>
> we tried pulling the image and we can see the file is present in the
> docker at \tmp\trust.jks but while using the same we get the following
> error.
>
>
> *Error Logs:*
> Error message from worker: generic::unknown:
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
> org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
>
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct
> kafka consumer
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:67)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:134)
>
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
>
> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
>
> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>
> org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
> org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>
> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>
> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>
> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>
> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>
> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
> /tmp/trust.jks of type JKS
>
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158)
>
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
>
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
>
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
>
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
> ... 42 more
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
> /tmp/trust.jks of type JKS
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
>
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
>
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:154)
> ... 46 more
> Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
> keystore /tmp/trust.jks of type JKS
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:155)
> ... 49 more
> Caused by: java.nio.file.NoSuchFileException: /tmp/trust.jks
>
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
>
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
>
> java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
> java.base/java.nio.file.Files.newByteChannel(Files.java:371)
> java.base/java.nio.file.Files.newByteChannel(Files.java:422)
>
> java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420)
> java.base/java.nio.file.Files.newInputStream(Files.java:156)
>
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
> ... 50 more
>
> *Here is the docker image that we used*
>
> FROM gcr.io/dataflow-templates-base/python38-template-launcher-base
> RUN apt-get update && \
>     apt-get install -y openjdk-11-jdk
>
> ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64
>
>
> ARG WORKDIR=/dataflow/python/using_flex_template_adv3
> RUN mkdir -p ${WORKDIR}
> WORKDIR ${WORKDIR}
>
> COPY . .
> COPY trust.jks /tmp/trust.jks
>
> ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
> ENV PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
> ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
> RUN apt-get update \
>     && apt-get install -y libffi-dev git \
>     && rm -rf /var/lib/apt/lists/* \
>     # Upgrade pip and install the requirements.
>     && pip install --no-cache-dir --upgrade pip \
>     && pip install --no-cache-dir -r $PYTHON_REQUIREMENTS_FILE \
>     # Download the requirements to speed up launching the Dataflow job.
>     && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache
> -r $PYTHON_REQUIREMENTS_FILE
>
> RUN pip install --no-cache-dir apache-beam[gcp]==2.43.0
> ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
>
>
>
> *Code Used:*
>
> read = p_input | "Read Json from Kafka" >> ReadFromKafka(
>     consumer_config=self.consumer_config,
>     topics=[self.topic],
>     commit_offset_in_finalize=False,
>     with_metadata=False
> )
>
>
> *consumer_config contents:*
>
> bootstrap.servers: bs:9095
> group.id: "Group1"
> ssl.truststore.location: '/tmp/trust.jks'
> sasl.mechanism: SCRAM-SHA-512
> security.protocol: SASL_SSL
> sasl.jaas.config: ""
>
>
>
> We are trying to run this pipeline on GCP Dataflow with apache beam python
> sdk.
>
>
>
> --
> Thanks and Regards,
>
> *Somnath Chouwdhury*Associate Engineer IV - Solutioning & Software
> Engineering
> +91 9284112897 <+91%2092841%2012897> | www.datametica.com
> Datametica Solutions Private Limited, Pune - India
>
>
> <https://www.linkedin.com/company/datametica>
> <https://twitter.com/DataMetica>
> <https://instagram.com/datametica_lifeatdm?utm_medium=copy_link>
> <https://www.facebook.com/datametica1>
> <https://www.youtube.com/channel/UCTBR2-f1mDSSpwD0BteETPQ>
>
>