You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Maria-Irina Sandu <ms...@google.com> on 2021/03/30 19:55:23 UTC

Dataflow - Kafka error

I'm trying to write to a Kafka topic using WriteTokafka module from
apache_beam.io.kafka.
The error I get is:

> File "predict.py", line 162, in <module>
> run()
> File "predict.py", line 158, in run
> topic = 'int.fitbit_explore.video_recommendations'))
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 580, in __exit__
> self.result = self.run()
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 530, in run
> self._options).run(False)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 902, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1252, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 116, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1229, in from_runner_api
> transform = ptransform.PTransform.from_runner_api(proto, context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
> line 733, in from_runner_api
> context)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
> line 1420, in from_runner_api_parameter
> pardo_payload.do_fn, context).serialized_dofn_data())
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
> line 1493, in from_runner_api
> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1


The pipeline looks like this:

> pipeline_options = PipelineOptions(argv)
> with beam.Pipeline(options=pipeline_options) as p:
> _ = (p | 'Create' >> beam.Create(['Start'])
> | 'Read MDAU' >>
> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
> | 'Predict' >> beam.ParDo(PredictDoFn())
> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers' :
> '<fitbit-bootstrap-server>:9092'},
> topic = '<internal_fitbit_topic>'))


I replaced the bootstrap server and topic values with placeholders here
because I'm not sure if I should show them or not.

The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes
and it looks like this:

class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
    video = VideosAndRatings()
    video.videoId = str(element['videoId'])
    video.rating = 5
    video.index = 1
    videosList = [video]
    recommendations = RecommendationsKafkaMessage()
recommendations.userId = str(element['userId'])
    recommendations.videos = videosList
    recommendations.category = "DISCOVER_WORKOUTS"
print(recommendations.userId, recommendations.category)
    trans = TTransport.TMemoryBuffer()
    proto = TBinaryProtocol.TBinaryProtocol(trans)
    recommendations.write(proto)    encoded_data = bytes(trans.getvalue())
    encoded_key = str(element['userId']).encode()    return
encoded_key, encoded_data

  def process(self, element) -> Iterable[Tuple[bytes,bytes]]:
    try:
      encoded_key, encoded_data = self.encode(element)
      yield (encoded_key, encoded_data)
    except Exception as e:
      print("encoding didn't work", e)
      yield TaggedOutput('encode_errors', f'element={element}, error={e}')

The command I use to run the pipeline is this:

python3 predict.py \
  --work-dir gs://fit-recommend-system-testpy/saved_model \
  --batch \
  --project fit-recommend-system-int \
  --runner DataflowRunner \
  --setup_file ./setup.py \
  --subnetwork https://www.googleapis.com/compute/v1/projects/<
<https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork>
\
  --job_name prediction \
  --region us-central1 \
  --temp_location gs://fit-recommend-system-testpy/temp \
  --staging_location gs://fit-recommend-system-testpy/staging \
  --no_use_public_ips \
  --sdk_harness_container_image_overrides
".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \
  --service_account_email
runtime@fit-recommend-system-int.iam.gserviceaccount.com

And I have installed apache beam with python3 -m pip install
apache_beam[gcp]==2.26.0.

Any help with this is much appreciated!

Best regards,

Irina

Re: Dataflow - Kafka error

Posted by Chamikara Jayalath <ch...@google.com>.
Great :)

On Fri, Apr 2, 2021 at 3:01 AM Maria-Irina Sandu <ms...@google.com> wrote:

> I was able to submit the job after specifying the runner. Thanks a lot!
>
>>

Re: Dataflow - Kafka error

Posted by Maria-Irina Sandu <ms...@google.com>.
I was able to submit the job after specifying the runner. Thanks a lot!

>

Re: Dataflow - Kafka error

Posted by Chamikara Jayalath <ch...@google.com>.
Dataflow multi-language pipelines need Runner v2 so you need to specify the
option "--experiments=use_runner_v2". Please see the example here
<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi>
for an exact command.

On Wed, Mar 31, 2021 at 1:06 PM Maria-Irina Sandu <ms...@google.com> wrote:

> Thanks all for replying!
> I tried with both 2.27.0 and 2.28.0 and the error was the same. I managed
> to make some progress using the second option proposed by Cham and am now
> getting the following error:
>
>> Traceback (most recent call last):
>> File "predict.py", line 163, in <module>
>> run()
>> File "predict.py", line 159, in run
>> p.run(False).wait_until_finish()
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 559, in run
>> return self.runner.run_pipeline(self, self._options)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>> line 638, in run_pipeline
>> self.dataflow_client.create_job(self.job), self)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/utils/retry.py",
>> line 260, in wrapper
>> return fun(*args, **kwargs)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py",
>> line 680, in create_job
>> return self.submit_job_description(job)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/utils/retry.py",
>> line 260, in wrapper
>> return fun(*args, **kwargs)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py",
>> line 747, in submit_job_description
>> response = self._client.projects_locations_jobs.Create(request)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py",
>> line 667, in Create
>> config, request, global_params=global_params)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py",
>> line 731, in _RunMethod
>> return self.ProcessHttpResponse(method_config, http_response, request)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py",
>> line 737, in ProcessHttpResponse
>> self.__ProcessHttpResponse(method_config, http_response, request))
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py",
>> line 604, in __ProcessHttpResponse
>> http_response, method_config=method_config, request=request)
>> apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <
>> https://dataflow.googleapis.com/v1b3/projects/fit-recommend-system-int/locations/us-central1/jobs?alt=json>:
>> response: <{'vary': 'Origin, X-Origin, Referer', 'content-type':
>> 'application/json; charset=UTF-8', 'date': 'Wed, 31 Mar 2021 19:14:32 GMT',
>> 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0',
>> 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff',
>> 'alt-svc': 'h3-29=":443"; ma=2592000,h3-T051=":443";
>> ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443";
>> ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"',
>> 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '288',
>> '-content-encoding': 'gzip'}>, content <{
>> "error": {
>>     "code": 400,
>>     "message": "Dataflow Runner v2 requires a valid FnApi job, Please
>> resubmit your job with a valid configuration. Note that if using Templates,
>> you may need to regenerate your template with the '--use_runner_v2'.",
>>     "status": "INVALID_ARGUMENT"
>>     }
>> }
>
>
>
> On Tue, Mar 30, 2021 at 11:27 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> I would suggest also including a more recent fix [1] or using
>> the workaround mentioned in [2].
>>
>> Thanks,
>> Cham
>>
>> [1] https://github.com/apache/beam/pull/14306
>> [2]
>> https://issues.apache.org/jira/browse/BEAM-11862?focusedCommentId=17305920&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305920
>>
>> On Tue, Mar 30, 2021 at 1:23 PM Brian Hulette <bh...@google.com>
>> wrote:
>>
>>> +Chamikara Jayalath <ch...@google.com>
>>>
>>> Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may
>>> have addressed the issue. It avoids the problematic code when the pipeline
>>> is multi-language [2].
>>>
>>> [1] https://github.com/apache/beam/pull/13536
>>> [2]
>>> https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524
>>>
>>> On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <ms...@google.com>
>>> wrote:
>>>
>>>> I'm trying to write to a Kafka topic using WriteTokafka module from
>>>> apache_beam.io.kafka.
>>>> The error I get is:
>>>>
>>>>> File "predict.py", line 162, in <module>
>>>>> run()
>>>>> File "predict.py", line 158, in run
>>>>> topic = 'int.fitbit_explore.video_recommendations'))
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 580, in __exit__
>>>>> self.result = self.run()
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 530, in run
>>>>> self._options).run(False)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 902, in from_runner_api
>>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>>> line 116, in get_by_id
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 1252, in from_runner_api
>>>>> part = context.transforms.get_by_id(transform_id)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>>> line 116, in get_by_id
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 1252, in from_runner_api
>>>>> part = context.transforms.get_by_id(transform_id)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>>> line 116, in get_by_id
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 1252, in from_runner_api
>>>>> part = context.transforms.get_by_id(transform_id)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>>> line 116, in get_by_id
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 1252, in from_runner_api
>>>>> part = context.transforms.get_by_id(transform_id)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>>> line 116, in get_by_id
>>>>> self._id_to_proto[id], self._pipeline_context)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>>> line 1229, in from_runner_api
>>>>> transform = ptransform.PTransform.from_runner_api(proto, context)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>>> line 733, in from_runner_api
>>>>> context)
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>>>> line 1420, in from_runner_api_parameter
>>>>> pardo_payload.do_fn, context).serialized_dofn_data())
>>>>> File
>>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>>>> line 1493, in from_runner_api
>>>>> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>>>>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
>>>>
>>>>
>>>> The pipeline looks like this:
>>>>
>>>>> pipeline_options = PipelineOptions(argv)
>>>>> with beam.Pipeline(options=pipeline_options) as p:
>>>>> _ = (p | 'Create' >> beam.Create(['Start'])
>>>>> | 'Read MDAU' >>
>>>>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
>>>>> | 'Predict' >> beam.ParDo(PredictDoFn())
>>>>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
>>>>> | 'WriteToKafka' >> WriteToKafka(producer_config
>>>>> = {'bootstrap.servers' : '<fitbit-bootstrap-server>:9092'},
>>>>> topic = '<internal_fitbit_topic>'))
>>>>
>>>>
>>>> I replaced the bootstrap server and topic values with placeholders here
>>>> because I'm not sure if I should show them or not.
>>>>
>>>> The ThriftEncodeDoFn function seems to work. It produces a tuple of
>>>> bytes and it looks like this:
>>>>
>>>> class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
>>>>     video = VideosAndRatings()
>>>>     video.videoId = str(element['videoId'])
>>>>     video.rating = 5
>>>>     video.index = 1
>>>>     videosList = [video]
>>>>     recommendations = RecommendationsKafkaMessage()    recommendations.userId = str(element['userId'])
>>>>     recommendations.videos = videosList
>>>>     recommendations.category = "DISCOVER_WORKOUTS"    print(recommendations.userId, recommendations.category)
>>>>     trans = TTransport.TMemoryBuffer()
>>>>     proto = TBinaryProtocol.TBinaryProtocol(trans)
>>>>     recommendations.write(proto)    encoded_data = bytes(trans.getvalue())
>>>>     encoded_key = str(element['userId']).encode()    return encoded_key, encoded_data
>>>>
>>>>   def process(self, element) -> Iterable[Tuple[bytes,bytes]]:
>>>>     try:
>>>>       encoded_key, encoded_data = self.encode(element)
>>>>       yield (encoded_key, encoded_data)
>>>>     except Exception as e:
>>>>       print("encoding didn't work", e)
>>>>       yield TaggedOutput('encode_errors', f'element={element}, error={e}')
>>>>
>>>> The command I use to run the pipeline is this:
>>>>
>>>> python3 predict.py \
>>>>   --work-dir gs://fit-recommend-system-testpy/saved_model \
>>>>   --batch \
>>>>   --project fit-recommend-system-int \
>>>>   --runner DataflowRunner \
>>>>   --setup_file ./setup.py \
>>>>   --subnetwork https://www.googleapis.com/compute/v1/projects/< <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork> \
>>>>   --job_name prediction \
>>>>   --region us-central1 \
>>>>   --temp_location gs://fit-recommend-system-testpy/temp \
>>>>   --staging_location gs://fit-recommend-system-testpy/staging \
>>>>   --no_use_public_ips \
>>>>   --sdk_harness_container_image_overrides ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \
>>>>   --service_account_email runtime@fit-recommend-system-int.iam.gserviceaccount.com
>>>>
>>>> And I have installed apache beam with python3 -m pip install apache_beam[gcp]==2.26.0.
>>>>
>>>> Any help with this is much appreciated!
>>>>
>>>> Best regards,
>>>>
>>>> Irina
>>>>
>>>>

Re: Dataflow - Kafka error

Posted by Maria-Irina Sandu <ms...@google.com>.
Thanks all for replying!
I tried with both 2.27.0 and 2.28.0 and the error was the same. I managed
to make some progress using the second option proposed by Cham and am now
getting the following error:

> Traceback (most recent call last):
> File "predict.py", line 163, in <module>
> run()
> File "predict.py", line 159, in run
> p.run(False).wait_until_finish()
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 559, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
> line 638, in run_pipeline
> self.dataflow_client.create_job(self.job), self)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/utils/retry.py",
> line 260, in wrapper
> return fun(*args, **kwargs)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py",
> line 680, in create_job
> return self.submit_job_description(job)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/utils/retry.py",
> line 260, in wrapper
> return fun(*args, **kwargs)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/apiclient.py",
> line 747, in submit_job_description
> response = self._client.projects_locations_jobs.Create(request)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py",
> line 667, in Create
> config, request, global_params=global_params)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py",
> line 731, in _RunMethod
> return self.ProcessHttpResponse(method_config, http_response, request)
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py",
> line 737, in ProcessHttpResponse
> self.__ProcessHttpResponse(method_config, http_response, request))
> File
> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apitools/base/py/base_api.py",
> line 604, in __ProcessHttpResponse
> http_response, method_config=method_config, request=request)
> apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <
> https://dataflow.googleapis.com/v1b3/projects/fit-recommend-system-int/locations/us-central1/jobs?alt=json>:
> response: <{'vary': 'Origin, X-Origin, Referer', 'content-type':
> 'application/json; charset=UTF-8', 'date': 'Wed, 31 Mar 2021 19:14:32 GMT',
> 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0',
> 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff',
> 'alt-svc': 'h3-29=":443"; ma=2592000,h3-T051=":443";
> ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443";
> ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"',
> 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '288',
> '-content-encoding': 'gzip'}>, content <{
> "error": {
>     "code": 400,
>     "message": "Dataflow Runner v2 requires a valid FnApi job, Please
> resubmit your job with a valid configuration. Note that if using Templates,
> you may need to regenerate your template with the '--use_runner_v2'.",
>     "status": "INVALID_ARGUMENT"
>     }
> }



On Tue, Mar 30, 2021 at 11:27 PM Chamikara Jayalath <ch...@google.com>
wrote:

> I would suggest also including a more recent fix [1] or using
> the workaround mentioned in [2].
>
> Thanks,
> Cham
>
> [1] https://github.com/apache/beam/pull/14306
> [2]
> https://issues.apache.org/jira/browse/BEAM-11862?focusedCommentId=17305920&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305920
>
> On Tue, Mar 30, 2021 at 1:23 PM Brian Hulette <bh...@google.com> wrote:
>
>> +Chamikara Jayalath <ch...@google.com>
>>
>> Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may
>> have addressed the issue. It avoids the problematic code when the pipeline
>> is multi-language [2].
>>
>> [1] https://github.com/apache/beam/pull/13536
>> [2]
>> https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524
>>
>> On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <ms...@google.com>
>> wrote:
>>
>>> I'm trying to write to a Kafka topic using WriteTokafka module from
>>> apache_beam.io.kafka.
>>> The error I get is:
>>>
>>>> File "predict.py", line 162, in <module>
>>>> run()
>>>> File "predict.py", line 158, in run
>>>> topic = 'int.fitbit_explore.video_recommendations'))
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 580, in __exit__
>>>> self.result = self.run()
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 530, in run
>>>> self._options).run(False)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 902, in from_runner_api
>>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>> line 116, in get_by_id
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 1252, in from_runner_api
>>>> part = context.transforms.get_by_id(transform_id)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>> line 116, in get_by_id
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 1252, in from_runner_api
>>>> part = context.transforms.get_by_id(transform_id)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>> line 116, in get_by_id
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 1252, in from_runner_api
>>>> part = context.transforms.get_by_id(transform_id)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>> line 116, in get_by_id
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 1252, in from_runner_api
>>>> part = context.transforms.get_by_id(transform_id)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>>> line 116, in get_by_id
>>>> self._id_to_proto[id], self._pipeline_context)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>>> line 1229, in from_runner_api
>>>> transform = ptransform.PTransform.from_runner_api(proto, context)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>>> line 733, in from_runner_api
>>>> context)
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>>> line 1420, in from_runner_api_parameter
>>>> pardo_payload.do_fn, context).serialized_dofn_data())
>>>> File
>>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>>> line 1493, in from_runner_api
>>>> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>>>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
>>>
>>>
>>> The pipeline looks like this:
>>>
>>>> pipeline_options = PipelineOptions(argv)
>>>> with beam.Pipeline(options=pipeline_options) as p:
>>>> _ = (p | 'Create' >> beam.Create(['Start'])
>>>> | 'Read MDAU' >>
>>>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
>>>> | 'Predict' >> beam.ParDo(PredictDoFn())
>>>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
>>>> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers'
>>>> : '<fitbit-bootstrap-server>:9092'},
>>>> topic = '<internal_fitbit_topic>'))
>>>
>>>
>>> I replaced the bootstrap server and topic values with placeholders here
>>> because I'm not sure if I should show them or not.
>>>
>>> The ThriftEncodeDoFn function seems to work. It produces a tuple of
>>> bytes and it looks like this:
>>>
>>> class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
>>>     video = VideosAndRatings()
>>>     video.videoId = str(element['videoId'])
>>>     video.rating = 5
>>>     video.index = 1
>>>     videosList = [video]
>>>     recommendations = RecommendationsKafkaMessage()    recommendations.userId = str(element['userId'])
>>>     recommendations.videos = videosList
>>>     recommendations.category = "DISCOVER_WORKOUTS"    print(recommendations.userId, recommendations.category)
>>>     trans = TTransport.TMemoryBuffer()
>>>     proto = TBinaryProtocol.TBinaryProtocol(trans)
>>>     recommendations.write(proto)    encoded_data = bytes(trans.getvalue())
>>>     encoded_key = str(element['userId']).encode()    return encoded_key, encoded_data
>>>
>>>   def process(self, element) -> Iterable[Tuple[bytes,bytes]]:
>>>     try:
>>>       encoded_key, encoded_data = self.encode(element)
>>>       yield (encoded_key, encoded_data)
>>>     except Exception as e:
>>>       print("encoding didn't work", e)
>>>       yield TaggedOutput('encode_errors', f'element={element}, error={e}')
>>>
>>> The command I use to run the pipeline is this:
>>>
>>> python3 predict.py \
>>>   --work-dir gs://fit-recommend-system-testpy/saved_model \
>>>   --batch \
>>>   --project fit-recommend-system-int \
>>>   --runner DataflowRunner \
>>>   --setup_file ./setup.py \
>>>   --subnetwork https://www.googleapis.com/compute/v1/projects/< <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork> \
>>>   --job_name prediction \
>>>   --region us-central1 \
>>>   --temp_location gs://fit-recommend-system-testpy/temp \
>>>   --staging_location gs://fit-recommend-system-testpy/staging \
>>>   --no_use_public_ips \
>>>   --sdk_harness_container_image_overrides ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \
>>>   --service_account_email runtime@fit-recommend-system-int.iam.gserviceaccount.com
>>>
>>> And I have installed apache beam with python3 -m pip install apache_beam[gcp]==2.26.0.
>>>
>>> Any help with this is much appreciated!
>>>
>>> Best regards,
>>>
>>> Irina
>>>
>>>

Re: Dataflow - Kafka error

Posted by Chamikara Jayalath <ch...@google.com>.
I would suggest also including a more recent fix [1] or using
the workaround mentioned in [2].

Thanks,
Cham

[1] https://github.com/apache/beam/pull/14306
[2]
https://issues.apache.org/jira/browse/BEAM-11862?focusedCommentId=17305920&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305920

On Tue, Mar 30, 2021 at 1:23 PM Brian Hulette <bh...@google.com> wrote:

> +Chamikara Jayalath <ch...@google.com>
>
> Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may
> have addressed the issue. It avoids the problematic code when the pipeline
> is multi-language [2].
>
> [1] https://github.com/apache/beam/pull/13536
> [2]
> https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524
>
> On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <ms...@google.com>
> wrote:
>
>> I'm trying to write to a Kafka topic using WriteTokafka module from
>> apache_beam.io.kafka.
>> The error I get is:
>>
>>> File "predict.py", line 162, in <module>
>>> run()
>>> File "predict.py", line 158, in run
>>> topic = 'int.fitbit_explore.video_recommendations'))
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 580, in __exit__
>>> self.result = self.run()
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 530, in run
>>> self._options).run(False)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 902, in from_runner_api
>>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1252, in from_runner_api
>>> part = context.transforms.get_by_id(transform_id)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 116, in get_by_id
>>> self._id_to_proto[id], self._pipeline_context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1229, in from_runner_api
>>> transform = ptransform.PTransform.from_runner_api(proto, context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>>> line 733, in from_runner_api
>>> context)
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>> line 1420, in from_runner_api_parameter
>>> pardo_payload.do_fn, context).serialized_dofn_data())
>>> File
>>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>>> line 1493, in from_runner_api
>>> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
>>
>>
>> The pipeline looks like this:
>>
>>> pipeline_options = PipelineOptions(argv)
>>> with beam.Pipeline(options=pipeline_options) as p:
>>> _ = (p | 'Create' >> beam.Create(['Start'])
>>> | 'Read MDAU' >>
>>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
>>> | 'Predict' >> beam.ParDo(PredictDoFn())
>>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
>>> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers'
>>> : '<fitbit-bootstrap-server>:9092'},
>>> topic = '<internal_fitbit_topic>'))
>>
>>
>> I replaced the bootstrap server and topic values with placeholders here
>> because I'm not sure if I should show them or not.
>>
>> The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes
>> and it looks like this:
>>
>> class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
>>     video = VideosAndRatings()
>>     video.videoId = str(element['videoId'])
>>     video.rating = 5
>>     video.index = 1
>>     videosList = [video]
>>     recommendations = RecommendationsKafkaMessage()    recommendations.userId = str(element['userId'])
>>     recommendations.videos = videosList
>>     recommendations.category = "DISCOVER_WORKOUTS"    print(recommendations.userId, recommendations.category)
>>     trans = TTransport.TMemoryBuffer()
>>     proto = TBinaryProtocol.TBinaryProtocol(trans)
>>     recommendations.write(proto)    encoded_data = bytes(trans.getvalue())
>>     encoded_key = str(element['userId']).encode()    return encoded_key, encoded_data
>>
>>   def process(self, element) -> Iterable[Tuple[bytes,bytes]]:
>>     try:
>>       encoded_key, encoded_data = self.encode(element)
>>       yield (encoded_key, encoded_data)
>>     except Exception as e:
>>       print("encoding didn't work", e)
>>       yield TaggedOutput('encode_errors', f'element={element}, error={e}')
>>
>> The command I use to run the pipeline is this:
>>
>> python3 predict.py \
>>   --work-dir gs://fit-recommend-system-testpy/saved_model \
>>   --batch \
>>   --project fit-recommend-system-int \
>>   --runner DataflowRunner \
>>   --setup_file ./setup.py \
>>   --subnetwork https://www.googleapis.com/compute/v1/projects/< <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork> \
>>   --job_name prediction \
>>   --region us-central1 \
>>   --temp_location gs://fit-recommend-system-testpy/temp \
>>   --staging_location gs://fit-recommend-system-testpy/staging \
>>   --no_use_public_ips \
>>   --sdk_harness_container_image_overrides ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \
>>   --service_account_email runtime@fit-recommend-system-int.iam.gserviceaccount.com
>>
>> And I have installed apache beam with python3 -m pip install apache_beam[gcp]==2.26.0.
>>
>> Any help with this is much appreciated!
>>
>> Best regards,
>>
>> Irina
>>
>>

Re: Dataflow - Kafka error

Posted by Brian Hulette <bh...@google.com>.
+Chamikara Jayalath <ch...@google.com>

Could you try with beam 2.27.0 or 2.28.0? I think that this PR [1] may have
addressed the issue. It avoids the problematic code when the pipeline is
multi-language [2].

[1] https://github.com/apache/beam/pull/13536
[2]
https://github.com/apache/beam/blob/7eff49fae34e8d3c50716f5da14fa6bcc607fc67/sdks/python/apache_beam/pipeline.py#L524

On Tue, Mar 30, 2021 at 12:55 PM Maria-Irina Sandu <ms...@google.com>
wrote:

> I'm trying to write to a Kafka topic using WriteTokafka module from
> apache_beam.io.kafka.
> The error I get is:
>
>> File "predict.py", line 162, in <module>
>> run()
>> File "predict.py", line 158, in run
>> topic = 'int.fitbit_explore.video_recommendations'))
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 580, in __exit__
>> self.result = self.run()
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 530, in run
>> self._options).run(False)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 902, in from_runner_api
>> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1252, in from_runner_api
>> part = context.transforms.get_by_id(transform_id)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 116, in get_by_id
>> self._id_to_proto[id], self._pipeline_context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1229, in from_runner_api
>> transform = ptransform.PTransform.from_runner_api(proto, context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py",
>> line 733, in from_runner_api
>> context)
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>> line 1420, in from_runner_api_parameter
>> pardo_payload.do_fn, context).serialized_dofn_data())
>> File
>> "/Users/msandu/rec_models/predict_workouts/experiment_kafka/env/lib/python3.7/site-packages/apache_beam/transforms/core.py",
>> line 1493, in from_runner_api
>> raise ValueError('Unexpected DoFn type: %s' % spec.urn)
>> ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
>
>
> The pipeline looks like this:
>
>> pipeline_options = PipelineOptions(argv)
>> with beam.Pipeline(options=pipeline_options) as p:
>> _ = (p | 'Create' >> beam.Create(['Start'])
>> | 'Read MDAU' >>
>> beam.io.textio.ReadFromText("gs://fit-recommend-system-testpy/saved_model/dummy_mdau.txt")
>> | 'Predict' >> beam.ParDo(PredictDoFn())
>> | 'EncodeThrift' >> beam.ParDo(ThriftEncodeDoFn())
>> | 'WriteToKafka' >> WriteToKafka(producer_config = {'bootstrap.servers' :
>> '<fitbit-bootstrap-server>:9092'},
>> topic = '<internal_fitbit_topic>'))
>
>
> I replaced the bootstrap server and topic values with placeholders here
> because I'm not sure if I should show them or not.
>
> The ThriftEncodeDoFn function seems to work. It produces a tuple of bytes
> and it looks like this:
>
> class ThriftEncodeDoFn(beam.DoFn):  def encode(self, element):
>     video = VideosAndRatings()
>     video.videoId = str(element['videoId'])
>     video.rating = 5
>     video.index = 1
>     videosList = [video]
>     recommendations = RecommendationsKafkaMessage()    recommendations.userId = str(element['userId'])
>     recommendations.videos = videosList
>     recommendations.category = "DISCOVER_WORKOUTS"    print(recommendations.userId, recommendations.category)
>     trans = TTransport.TMemoryBuffer()
>     proto = TBinaryProtocol.TBinaryProtocol(trans)
>     recommendations.write(proto)    encoded_data = bytes(trans.getvalue())
>     encoded_key = str(element['userId']).encode()    return encoded_key, encoded_data
>
>   def process(self, element) -> Iterable[Tuple[bytes,bytes]]:
>     try:
>       encoded_key, encoded_data = self.encode(element)
>       yield (encoded_key, encoded_data)
>     except Exception as e:
>       print("encoding didn't work", e)
>       yield TaggedOutput('encode_errors', f'element={element}, error={e}')
>
> The command I use to run the pipeline is this:
>
> python3 predict.py \
>   --work-dir gs://fit-recommend-system-testpy/saved_model \
>   --batch \
>   --project fit-recommend-system-int \
>   --runner DataflowRunner \
>   --setup_file ./setup.py \
>   --subnetwork https://www.googleapis.com/compute/v1/projects/< <https://www.googleapis.com/compute/v1/projects/fit-networking-glob/regions/us-central1/subnetworks/fit-networking-glob>fitbit-internal-subnetwork> \
>   --job_name prediction \
>   --region us-central1 \
>   --temp_location gs://fit-recommend-system-testpy/temp \
>   --staging_location gs://fit-recommend-system-testpy/staging \
>   --no_use_public_ips \
>   --sdk_harness_container_image_overrides ".*java.*,gcr.io/cloud-dataflow/v1beta3/beam_java8_sdk:2.26.0" \
>   --service_account_email runtime@fit-recommend-system-int.iam.gserviceaccount.com
>
> And I have installed apache beam with python3 -m pip install apache_beam[gcp]==2.26.0.
>
> Any help with this is much appreciated!
>
> Best regards,
>
> Irina
>
>