You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by jitendra sharma <ji...@gmail.com> on 2019/12/05 16:50:23 UTC

No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView

Hi,

I am running beam job using Spark Runner and getting below error:

19/12/05 22:14:36 WARN UnboundedDataset: Provided StorageLevel: MEMORY_ONLY
is ignored for streams, using the default level: StorageLevel(memory, 1
replicas)
[WARNING]
java.lang.IllegalStateException: No TransformEvaluator registered for
UNBOUNDED transform View.CreatePCollectionView
    at
org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions.checkState
(Preconditions.java:518)
    at
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
(StreamingTransformTranslator.java:553)
    at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
(SparkRunner.java:464)

is any idea or help appreciated?


Regards,
Jitendra Sharma

Re: No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView

Posted by Luke Cwik <lc...@google.com>.
Have you set the --streaming flag/pipeline option?

Also, using maps as keys for a Map view won't work unless the coder for the
map is deterministic. Consider using View.AsSingleton instead which doesn't
rely on a key coder that is deterministic.

On Tue, Dec 10, 2019 at 9:58 AM jitendra sharma <ji...@gmail.com>
wrote:

> Hi Alexey,
>
> Thank you very much for your email.
> Here is sample code which I used to create views and this view pass as
> side input for pipeline. Wondering same piece of code will work using
> FlinkRunner but in SparkRunner it is failed.
>
> private static PCollectionView<Map<Map<String, String>, List<String>>>
>       getCompositeRuleExpressions(
>           int refreshInSeconds,
>           String apiUrl,
>           Pipeline p,
>           String appMappingUrl)
>           throws Exception {
>
>     PCollection<Long> counter =
>         p.apply(GenerateSequence.from(0).withRate(1,
> Duration.standardSeconds(1L)))
>             .apply("Apply window",
> Window.<Long>into(FixedWindows.of(Duration.standardSeconds(2))))
>
> .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
>
>     return counter
>         .apply(
>             ParDo.of(
>                 new DoFn<Long, KV<Map<String, String>, List<String>>>() {
>                   @ProcessElement
>                   public void process(
>                       @Element Long input,
>                       OutputReceiver<KV<Map<String, String>,
> List<String>>> o) {
>                     Map<String, String> rules = new HashMap<>();
>                     List<String> listOfSources = new ArrayList<>();
>                     try {
>                       rules = getListOfcompositeUrl(apiUrl);
>                       listOfSources = Utils.getValidSources(appMappingUrl);
>                     } catch (Exception e) {
>                       LOG.error("Exception occured", e);
>                     }
>                     o.output(KV.of(rules, listOfSources));
>                   }
>                 }))
>         .apply(View.asMap());
>   }
>
> PCollectionView<Map<Map<String, String>, List<String>>> ruleList =
>
>         getCompositeRuleExpressions(
>
>             compositeSegmentRefreshInSeconds, apiUrl, p, appMappingUrl);
>
>     record
>
>         .apply("Apply window",
> Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
>
>         .apply(
>
>             "Process rules for user",
>
>             ParDo.of(new ProcessRulesFn(ruleList, type, index, url))
>
>                 .withSideInputs(ruleList))
>
>
> Exception Message:
>
> 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
> Combine.GroupedValues
> 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
> org.apache.beam.sdk.transforms.MapElements$1@79926285
> 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
> org.apache.beam.sdk.transforms.View$VoidKeyToMultimapMaterialization$VoidKeyToMultimapMaterializationDoFn@66b5c35c
> [WARNING]
> java.lang.IllegalStateException: No TransformEvaluator registered for
> UNBOUNDED transform View.CreatePCollectionView
>     at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
> (Preconditions.java:588)
>     at
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
> (StreamingTransformTranslator.java:552)
>     at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
> (SparkRunner.java:456)
>     at
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform
> (SparkRunner.java:426)
>     at
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform
> (SparkRunner.java:419)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
> (TransformHierarchy.java:665)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
> (TransformHierarchy.java:657)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
> (TransformHierarchy.java:657)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600
> (TransformHierarchy.java:317)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit
> (TransformHierarchy.java:251)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically
> (Pipeline.java:460)
>     at
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
> (SparkRunnerStreamingContextFactory.java:88)
>     at
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
> (SparkRunnerStreamingContextFactory.java:46)
>     at
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply
> (JavaStreamingContext.scala:627)
>
>
> Regards,
>
> Jitendra
>
> On Tue, Dec 10, 2019 at 11:07 PM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
>
>> Hi Jitendra,
>>
>> Could you give more details about your pipeline? Is it possible to share
>> a code of this pipeline?
>>
>> > On 5 Dec 2019, at 17:50, jitendra sharma <ji...@gmail.com>
>> wrote:
>> >
>> > Hi,
>> >
>> > I am running beam job using Spark Runner and getting below error:
>> >
>> > 19/12/05 22:14:36 WARN UnboundedDataset: Provided StorageLevel:
>> MEMORY_ONLY is ignored for streams, using the default level:
>> StorageLevel(memory, 1 replicas)
>> > [WARNING]
>> > java.lang.IllegalStateException: No TransformEvaluator registered for
>> UNBOUNDED transform View.CreatePCollectionView
>> >     at
>> org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions.checkState
>> (Preconditions.java:518)
>> >     at
>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
>> (StreamingTransformTranslator.java:553)
>> >     at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
>> (SparkRunner.java:464)
>> >
>> > is any idea or help appreciated?
>> >
>> >
>> > Regards,
>> > Jitendra Sharma
>>
>>
>
> --
> Jitendra Sharma
>

Re: No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView

Posted by jitendra sharma <ji...@gmail.com>.
Hi Alexey,

Thank you very much for your email.
Here is sample code which I used to create views and this view pass as side
input for pipeline. Wondering same piece of code will work using
FlinkRunner but in SparkRunner it is failed.

private static PCollectionView<Map<Map<String, String>, List<String>>>
      getCompositeRuleExpressions(
          int refreshInSeconds,
          String apiUrl,
          Pipeline p,
          String appMappingUrl)
          throws Exception {

    PCollection<Long> counter =
        p.apply(GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(1L)))
            .apply("Apply window",
Window.<Long>into(FixedWindows.of(Duration.standardSeconds(2))))

.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());

    return counter
        .apply(
            ParDo.of(
                new DoFn<Long, KV<Map<String, String>, List<String>>>() {
                  @ProcessElement
                  public void process(
                      @Element Long input,
                      OutputReceiver<KV<Map<String, String>, List<String>>>
o) {
                    Map<String, String> rules = new HashMap<>();
                    List<String> listOfSources = new ArrayList<>();
                    try {
                      rules = getListOfcompositeUrl(apiUrl);
                      listOfSources = Utils.getValidSources(appMappingUrl);
                    } catch (Exception e) {
                      LOG.error("Exception occured", e);
                    }
                    o.output(KV.of(rules, listOfSources));
                  }
                }))
        .apply(View.asMap());
  }

PCollectionView<Map<Map<String, String>, List<String>>> ruleList =

        getCompositeRuleExpressions(

            compositeSegmentRefreshInSeconds, apiUrl, p, appMappingUrl);

    record

        .apply("Apply window",
Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))

        .apply(

            "Process rules for user",

            ParDo.of(new ProcessRulesFn(ruleList, type, index, url))

                .withSideInputs(ruleList))


Exception Message:

19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
Combine.GroupedValues
19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
org.apache.beam.sdk.transforms.MapElements$1@79926285
19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
org.apache.beam.sdk.transforms.View$VoidKeyToMultimapMaterialization$VoidKeyToMultimapMaterializationDoFn@66b5c35c
[WARNING]
java.lang.IllegalStateException: No TransformEvaluator registered for
UNBOUNDED transform View.CreatePCollectionView
    at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
(Preconditions.java:588)
    at
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
(StreamingTransformTranslator.java:552)
    at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
(SparkRunner.java:456)
    at org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform
(SparkRunner.java:426)
    at
org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform
(SparkRunner.java:419)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:665)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
(TransformHierarchy.java:657)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600
(TransformHierarchy.java:317)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit
(TransformHierarchy.java:251)
    at org.apache.beam.sdk.Pipeline.traverseTopologically
(Pipeline.java:460)
    at
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
(SparkRunnerStreamingContextFactory.java:88)
    at
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
(SparkRunnerStreamingContextFactory.java:46)
    at
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply
(JavaStreamingContext.scala:627)


Regards,

Jitendra

On Tue, Dec 10, 2019 at 11:07 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Jitendra,
>
> Could you give more details about your pipeline? Is it possible to share a
> code of this pipeline?
>
> > On 5 Dec 2019, at 17:50, jitendra sharma <ji...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > I am running beam job using Spark Runner and getting below error:
> >
> > 19/12/05 22:14:36 WARN UnboundedDataset: Provided StorageLevel:
> MEMORY_ONLY is ignored for streams, using the default level:
> StorageLevel(memory, 1 replicas)
> > [WARNING]
> > java.lang.IllegalStateException: No TransformEvaluator registered for
> UNBOUNDED transform View.CreatePCollectionView
> >     at
> org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions.checkState
> (Preconditions.java:518)
> >     at
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
> (StreamingTransformTranslator.java:553)
> >     at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
> (SparkRunner.java:464)
> >
> > is any idea or help appreciated?
> >
> >
> > Regards,
> > Jitendra Sharma
>
>

-- 
Jitendra Sharma

Re: No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Jitendra,

Could you give more details about your pipeline? Is it possible to share a code of this pipeline?

> On 5 Dec 2019, at 17:50, jitendra sharma <ji...@gmail.com> wrote:
> 
> Hi,
> 
> I am running beam job using Spark Runner and getting below error:
> 
> 19/12/05 22:14:36 WARN UnboundedDataset: Provided StorageLevel: MEMORY_ONLY is ignored for streams, using the default level: StorageLevel(memory, 1 replicas)
> [WARNING]
> java.lang.IllegalStateException: No TransformEvaluator registered for UNBOUNDED transform View.CreatePCollectionView
>     at org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions.checkState (Preconditions.java:518)
>     at org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded (StreamingTransformTranslator.java:553)
>     at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate (SparkRunner.java:464)
> 
> is any idea or help appreciated?
> 
> 
> Regards,
> Jitendra Sharma