You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "zhenglin.Tian" <zh...@cafintech.com> on 2017/05/04 02:37:10 UTC

Re: A problem about additional outputs

hi, i have a trouble about addition outputs with SparkRunner.
Here if my code, when i use DirectRunner, everything runs OK, but if i replace DirectRunner with SparkRunner, the code can't run normally.

public class UnifiedDataExtraction {
   
    private static TupleTag<String> rawDataTag = new TupleTag<String>() {
    };
    
    private static TupleTag<String> exceptionTag = new TupleTag<String>() {
    };

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);

        SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
        options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName(ConstantsOwn.SPARK_APPNAME);
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);


        List<String> topics = Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));

        PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
                .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
                .withTopics(topics)
                //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withKeyDeserializer(VoidDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()
        ).apply(Values.<String>create());
       
        rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print each elment of rawData. Able to run normally   ①
        PCollectionTuple results = rawData.apply("logAnatomyTest",                                                                        //   ②
                ParDo.of(
                        new DoFn<String, String>() {
                            @ProcessElement
                            public void process(ProcessContext c) {
                                String element = c.element();
                                System.out.println("===="+element);
                                if (!element.equals("EOF")) {
                                    c.output(c.element());
                                }
                            }
                        }
                ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
        );
        p.run().waitUntilFinish();
   }
}

in the privious code, the code that be commented with ① can be able to run normally,but ②,i can't get anything.

here is my beam version
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>0.7.0-SNAPSHOT</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-spark</artifactId>
     <version>0.7.0-SNAPSHOT</version>
</dependency>


someone please help me.



Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&amp;utm_medium=email&amp;utm_campaign=sent-from-mailbird]
On 2017/4/28 4:43:23, Aviem Zur <av...@gmail.com> wrote:
Yes. Spark streaming support is still experimental and this issue exists in Beam 0.6.0

This has since been fixed and the fix will be a part of the upcoming release.

Since this isn't the first time a user has encountered this I've created a JIRA ticket for better visibility for this issue: https://issues.apache.org/jira/browse/BEAM-2106 [https://issues.apache.org/jira/browse/BEAM-2106]

Thanks for reaching out! Please feel fry to try out your pipeline using Beam master branch or one of the nightly SNAPSHOT builds.

On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:

Here is my maven configuration, thank you.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>0.6.0</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>0.6.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.6.0</version>
</dependency>


On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviemzur@gmail.com [mailto:aviemzur@gmail.com]> wrote:

Hi,

Can you please share which version of Beam you are using?

On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:

hi, here is my program that about additional outputs for Apache Beam  and  the result :                                        
public class DataExtraction2 {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setSparkMaster("local[4]");
//        options.setCheckpointDir("./checkpoint");
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName("testMavenDependency");
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);
        List<String> topics = Arrays.asList("beamOnSparkTest".split(","));
    
        final TupleTag<String> rawDataTag = new TupleTag<String>() {
        };
     
        final TupleTag<String> exceptionTag = new TupleTag<String>() {
        };
        final TupleTag<String> riskEventLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> statisticsTag = new TupleTag<String>() {
        };
        final TupleTag<String> errorTargetLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> equipmentLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> performanceLogTag = new TupleTag<String>() {
        };
        PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
                .withBootstrapServers("172.17.1.138:9092 [http://172.17.1.138:9092/],172.17.1.137:9092 [http://172.17.1.137:9092/]")
                .withTopics(topics)
                .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withoutMetadata()
        ).apply(Values.<String>create());
        PCollectionTuple results = rawData.apply(
                ParDo.withOutputTags(rawDataTag,
                        TupleTagList.of(exceptionTag)
                                .and(riskEventLogTag)
                                .and(statisticsTag)
                                .and(errorTargetLogTag)
                                .and(equipmentLogTag)
                                .and(performanceLogTag))
                        .of(new DoFn<String, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {
                                String idCoop = "";
                                int eventType = 0;
                                int osPlatformType = -1;
                                String innerDecision = "";
                                String outterDecision = "";
                                // Date appTime = new Date();
                                String eventId = "";
                                //String strategyList = "";
                                String uuid = "";
                                String phoneNo = "";
                                int equipmentType = -1;
                                int antiFraudTime = -1;
                                ......
                            }
                        }));
        p.run().waitUntilFinish();
    }
}
when i run this program, i get result:
.....
....
2017-04-26 15:06:13,077 [pool-1-thread-1] [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.beam.runners.spark.translation.EvaluationContext
Serialization stack:
- object not serializable (class: org.apache.beam.runners.spark.translation.EvaluationContext, value: org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
- field (class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, name: val$context, type: class org.apache.beam.runners.spark.translation.EvaluationContext)
- object (class org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8)
- field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, name: transformFunc$3, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.TransformedDStream, org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.InternalMapWithStateDStream, org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.FilteredDStream, org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
...
....

if only one main output, program works OK
can you tell me why?

Re: A problem about additional outputs

Posted by "zhenglin.Tian" <zh...@cafintech.com>.
Hi,Aviem Zur
   I have a issue about apache beam with SparkRunner. i read data from kafka and execute some logic,then get a exception:

    2017-05-10 19:02:11,729 [task-result-getter-3] [org.apache.spark.scheduler.TaskSetManager] [ERROR] - Task 0.0 in stage 2341.0 (TID 2227) had a not serializable result: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow
Serialization stack:
- object not serializable (class: org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow, value: TimestampedValueInGlobalWindow{value={"et":0,"idCooper":"83babd4973c24c9facadaa1d1b4e6aa2","supplyParams":[{"name":"loc","val":"39.915832,116.428342"},{"name":"amount","val":5001}

apache beam dependency is:
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>0.7.0-SNAPSHOT</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-spark</artifactId>
     <version>0.7.0-SNAPSHOT</version>
</dependency>

Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird]
On 2017/5/4 14:13:03, zhenglin.Tian <zh...@cafintech.com> wrote:
yes, when i tried to get the tag from the result,i have encountered 'NullPointerException' problem.
i pull the latest version, and rebuild my project, it runs OK!
thank you!!!


Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&amp;utm_medium=email&amp;utm_campaign=sent-from-mailbird]
On 2017/5/4 12:11:01, Aviem Zur <av...@gmail.com> wrote:
By "cannot run normally" do you mean you get an exception? We recently had a bug on master in which streaming pipelines containing `ParDo` with multiple outputs ran into `NullPointerException`. This was fixed here: https://issues.apache.org/jira/browse/BEAM-2029 [https://issues.apache.org/jira/browse/BEAM-2029]
Is this what you're facing? If so does pulling master and rebuilding help?

On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian <zhenglin.tian@cafintech.com [mailto:zhenglin.tian@cafintech.com]> wrote:

hi, i have a trouble about addition outputs with SparkRunner.
Here if my code, when i use DirectRunner, everything runs OK, but if i replace DirectRunner with SparkRunner, the code can't run normally.

public class UnifiedDataExtraction {
   
    private static TupleTag<String> rawDataTag = new TupleTag<String>() {
    };
    
    private static TupleTag<String> exceptionTag = new TupleTag<String>() {
    };

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);

        SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
        options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName(ConstantsOwn.SPARK_APPNAME);
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);


        List<String> topics = Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));

        PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
                .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
                .withTopics(topics)
                //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withKeyDeserializer(VoidDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()
        ).apply(Values.<String>create());
       
        rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print each elment of rawData. Able to run normally   ①
        PCollectionTuple results = rawData.apply("logAnatomyTest",                                                                        //   ②
                ParDo.of(
                        new DoFn<String, String>() {
                            @ProcessElement
                            public void process(ProcessContext c) {
                                String element = c.element();
                                System.out.println("===="+element);
                                if (!element.equals("EOF")) {
                                    c.output(c.element());
                                }
                            }
                        }
                ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
        );
        p.run().waitUntilFinish();
   }
}

in the privious code, the code that be commented with ① can be able to run normally,but ②,i can't get anything.

here is my beam version
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>0.7.0-SNAPSHOT</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-spark</artifactId>
     <version>0.7.0-SNAPSHOT</version>
</dependency>


someone please help me.



Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&amp;utm_medium=email&amp;utm_campaign=sent-from-mailbird]
On 2017/4/28 4:43:23, Aviem Zur <aviemzur@gmail.com [mailto:aviemzur@gmail.com]> wrote:
Yes. Spark streaming support is still experimental and this issue exists in Beam 0.6.0

This has since been fixed and the fix will be a part of the upcoming release.

Since this isn't the first time a user has encountered this I've created a JIRA ticket for better visibility for this issue: https://issues.apache.org/jira/browse/BEAM-2106 [https://issues.apache.org/jira/browse/BEAM-2106]

Thanks for reaching out! Please feel fry to try out your pipeline using Beam master branch or one of the nightly SNAPSHOT builds.

On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:

Here is my maven configuration, thank you.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>0.6.0</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>0.6.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.6.0</version>
</dependency>


On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviemzur@gmail.com [mailto:aviemzur@gmail.com]> wrote:

Hi,

Can you please share which version of Beam you are using?

On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:

hi, here is my program that about additional outputs for Apache Beam  and  the result :                                        
public class DataExtraction2 {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setSparkMaster("local[4]");
//        options.setCheckpointDir("./checkpoint");
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName("testMavenDependency");
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);
        List<String> topics = Arrays.asList("beamOnSparkTest".split(","));
    
        final TupleTag<String> rawDataTag = new TupleTag<String>() {
        };
     
        final TupleTag<String> exceptionTag = new TupleTag<String>() {
        };
        final TupleTag<String> riskEventLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> statisticsTag = new TupleTag<String>() {
        };
        final TupleTag<String> errorTargetLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> equipmentLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> performanceLogTag = new TupleTag<String>() {
        };
        PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
                .withBootstrapServers("172.17.1.138:9092 [http://172.17.1.138:9092/],172.17.1.137:9092 [http://172.17.1.137:9092/]")
                .withTopics(topics)
                .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withoutMetadata()
        ).apply(Values.<String>create());
        PCollectionTuple results = rawData.apply(
                ParDo.withOutputTags(rawDataTag,
                        TupleTagList.of(exceptionTag)
                                .and(riskEventLogTag)
                                .and(statisticsTag)
                                .and(errorTargetLogTag)
                                .and(equipmentLogTag)
                                .and(performanceLogTag))
                        .of(new DoFn<String, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {
                                String idCoop = "";
                                int eventType = 0;
                                int osPlatformType = -1;
                                String innerDecision = "";
                                String outterDecision = "";
                                // Date appTime = new Date();
                                String eventId = "";
                                //String strategyList = "";
                                String uuid = "";
                                String phoneNo = "";
                                int equipmentType = -1;
                                int antiFraudTime = -1;
                                ......
                            }
                        }));
        p.run().waitUntilFinish();
    }
}
when i run this program, i get result:
.....
....
2017-04-26 15:06:13,077 [pool-1-thread-1] [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.beam.runners.spark.translation.EvaluationContext
Serialization stack:
- object not serializable (class: org.apache.beam.runners.spark.translation.EvaluationContext, value: org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
- field (class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, name: val$context, type: class org.apache.beam.runners.spark.translation.EvaluationContext)
- object (class org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8)
- field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, name: transformFunc$3, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.TransformedDStream, org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.InternalMapWithStateDStream, org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.FilteredDStream, org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
...
....

if only one main output, program works OK
can you tell me why?

Re: A problem about additional outputs

Posted by "zhenglin.Tian" <zh...@cafintech.com>.
yes, when i tried to get the tag from the result,i have encountered 'NullPointerException' problem.
i pull the latest version, and rebuild my project, it runs OK!
thank you!!!


Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&amp;utm_medium=email&amp;utm_campaign=sent-from-mailbird]
On 2017/5/4 12:11:01, Aviem Zur <av...@gmail.com> wrote:
By "cannot run normally" do you mean you get an exception? We recently had a bug on master in which streaming pipelines containing `ParDo` with multiple outputs ran into `NullPointerException`. This was fixed here: https://issues.apache.org/jira/browse/BEAM-2029 [https://issues.apache.org/jira/browse/BEAM-2029]
Is this what you're facing? If so does pulling master and rebuilding help?

On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian <zhenglin.tian@cafintech.com [mailto:zhenglin.tian@cafintech.com]> wrote:

hi, i have a trouble about addition outputs with SparkRunner.
Here if my code, when i use DirectRunner, everything runs OK, but if i replace DirectRunner with SparkRunner, the code can't run normally.

public class UnifiedDataExtraction {
   
    private static TupleTag<String> rawDataTag = new TupleTag<String>() {
    };
    
    private static TupleTag<String> exceptionTag = new TupleTag<String>() {
    };

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);

        SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
        options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName(ConstantsOwn.SPARK_APPNAME);
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);


        List<String> topics = Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));

        PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
                .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
                .withTopics(topics)
                //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withKeyDeserializer(VoidDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withoutMetadata()
        ).apply(Values.<String>create());
       
        rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print each elment of rawData. Able to run normally   ①
        PCollectionTuple results = rawData.apply("logAnatomyTest",                                                                        //   ②
                ParDo.of(
                        new DoFn<String, String>() {
                            @ProcessElement
                            public void process(ProcessContext c) {
                                String element = c.element();
                                System.out.println("===="+element);
                                if (!element.equals("EOF")) {
                                    c.output(c.element());
                                }
                            }
                        }
                ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
        );
        p.run().waitUntilFinish();
   }
}

in the privious code, the code that be commented with ① can be able to run normally,but ②,i can't get anything.

here is my beam version
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>0.7.0-SNAPSHOT</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.7.0-SNAPSHOT</version>
</dependency>
<dependency>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-spark</artifactId>
     <version>0.7.0-SNAPSHOT</version>
</dependency>


someone please help me.



Sent from Mailbird [http://www.getmailbird.com/?utm_source=Mailbird&amp;utm_medium=email&amp;utm_campaign=sent-from-mailbird]
On 2017/4/28 4:43:23, Aviem Zur <aviemzur@gmail.com [mailto:aviemzur@gmail.com]> wrote:
Yes. Spark streaming support is still experimental and this issue exists in Beam 0.6.0

This has since been fixed and the fix will be a part of the upcoming release.

Since this isn't the first time a user has encountered this I've created a JIRA ticket for better visibility for this issue: https://issues.apache.org/jira/browse/BEAM-2106 [https://issues.apache.org/jira/browse/BEAM-2106]

Thanks for reaching out! Please feel fry to try out your pipeline using Beam master branch or one of the nightly SNAPSHOT builds.

On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:

Here is my maven configuration, thank you.

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>0.6.0</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>0.6.0</version>
  <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>0.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.6.0</version>
</dependency>


On 26 Apr 2017, at 6:58 PM, Aviem Zur <aviemzur@gmail.com [mailto:aviemzur@gmail.com]> wrote:

Hi,

Can you please share which version of Beam you are using?

On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <4498237@qq.com [mailto:4498237@qq.com]> wrote:

hi, here is my program that about additional outputs for Apache Beam  and  the result :                                        
public class DataExtraction2 {
    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1");
        SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setSparkMaster("local[4]");
//        options.setCheckpointDir("./checkpoint");
        options.setRunner(SparkRunner.class);
//        options.setRunner(DirectRunner.class);
        options.setStorageLevel("MEMORY_ONLY");
        options.setAppName("testMavenDependency");
        options.setBatchIntervalMillis(1000L);
        options.setEnableSparkMetricSinks(false);
        Pipeline p = Pipeline.create(options);
        List<String> topics = Arrays.asList("beamOnSparkTest".split(","));
    
        final TupleTag<String> rawDataTag = new TupleTag<String>() {
        };
     
        final TupleTag<String> exceptionTag = new TupleTag<String>() {
        };
        final TupleTag<String> riskEventLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> statisticsTag = new TupleTag<String>() {
        };
        final TupleTag<String> errorTargetLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> equipmentLogTag = new TupleTag<String>() {
        };
        final TupleTag<String> performanceLogTag = new TupleTag<String>() {
        };
        PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
                .withBootstrapServers("172.17.1.138:9092 [http://172.17.1.138:9092/],172.17.1.137:9092 [http://172.17.1.137:9092/]")
                .withTopics(topics)
                .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
                .withKeyCoder(VoidCoder.of())
                .withValueCoder(StringUtf8Coder.of())
                .withoutMetadata()
        ).apply(Values.<String>create());
        PCollectionTuple results = rawData.apply(
                ParDo.withOutputTags(rawDataTag,
                        TupleTagList.of(exceptionTag)
                                .and(riskEventLogTag)
                                .and(statisticsTag)
                                .and(errorTargetLogTag)
                                .and(equipmentLogTag)
                                .and(performanceLogTag))
                        .of(new DoFn<String, String>() {
                            @ProcessElement
                            public void processElement(ProcessContext c) {
                                String idCoop = "";
                                int eventType = 0;
                                int osPlatformType = -1;
                                String innerDecision = "";
                                String outterDecision = "";
                                // Date appTime = new Date();
                                String eventId = "";
                                //String strategyList = "";
                                String uuid = "";
                                String phoneNo = "";
                                int equipmentType = -1;
                                int antiFraudTime = -1;
                                ......
                            }
                        }));
        p.run().waitUntilFinish();
    }
}
when i run this program, i get result:
.....
....
2017-04-26 15:06:13,077 [pool-1-thread-1] [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
org.apache.beam.runners.spark.translation.EvaluationContext
Serialization stack:
- object not serializable (class: org.apache.beam.runners.spark.translation.EvaluationContext, value: org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
- field (class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, name: val$context, type: class org.apache.beam.runners.spark.translation.EvaluationContext)
- object (class org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8)
- field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, name: transformFunc$3, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, <function1>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>)
- field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>)
- field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2)
- object (class org.apache.spark.streaming.dstream.TransformedDStream, org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.InternalMapWithStateDStream, org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.FilteredDStream, org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
...
....

if only one main output, program works OK
can you tell me why?

Re: A problem about additional outputs

Posted by Aviem Zur <av...@gmail.com>.
By "cannot run normally" do you mean you get an exception? We recently had
a bug on master in which streaming pipelines containing `ParDo` with
multiple outputs ran into `NullPointerException`. This was fixed here:
https://issues.apache.org/jira/browse/BEAM-2029
Is this what you're facing? If so does pulling master and rebuilding help?

On Thu, May 4, 2017 at 5:37 AM zhenglin.Tian <zh...@cafintech.com>
wrote:

> hi, i have a trouble about addition outputs with SparkRunner.
> Here if my code, when i use DirectRunner, everything runs OK, but if i
> replace DirectRunner with SparkRunner, the code can't run normally.
>
> public class UnifiedDataExtraction {
>
>     private static TupleTag<String> rawDataTag = new TupleTag<String>() {
>     };
>
>     private static TupleTag<String> exceptionTag = new TupleTag<String>() {
>     };
>
>     public static void main(String[] args) {
>         System.setProperty("hadoop.home.dir", ConstantsOwn.HADOOP_HOME);
>
>         SparkPipelineOptions options =
> PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
>         options.setSparkMaster(ConstantsOwn.SPARK_MASTER);
>         options.setRunner(SparkRunner.class);
> //        options.setRunner(DirectRunner.class);
>         options.setStorageLevel("MEMORY_ONLY");
>         options.setAppName(ConstantsOwn.SPARK_APPNAME);
>         options.setBatchIntervalMillis(1000L);
>         options.setEnableSparkMetricSinks(false);
>         Pipeline p = Pipeline.create(options);
>
>
>         List<String> topics =
> Arrays.asList(ConstantsOwn.KAFKA_TOPIC_ANTIFRAUD.split(","));
>
>         PCollection<String> rawData = p.apply(KafkaIO.<Void, String>read()
>                 .withBootstrapServers(ConstantsOwn.KAFKA_ADDRESS)
>                 .withTopics(topics)
>                 //.withConsumerFactoryFn(new CafintechConsumerFactoryFn())
>                 .withKeyCoder(VoidCoder.of())
>                 .withValueCoder(StringUtf8Coder.of())
>                 .withKeyDeserializer(VoidDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class)
>                 .withoutMetadata()
>         ).apply(Values.<String>create());
>
>         rawData.apply(ParDo.of(SimpleViewDoFn.of(true))); //simply print
> each elment of rawData. Able to run normally   ①
>         PCollectionTuple results = rawData.apply("logAnatomyTest",
>                                                                //   ②
>                 ParDo.of(
>                         new DoFn<String, String>() {
>                             @ProcessElement
>                             public void process(ProcessContext c) {
>                                 String element = c.element();
>                                 System.out.println("===="+element);
>                                 if (!element.equals("EOF")) {
>                                     c.output(c.element());
>                                 }
>                             }
>                         }
>                 ).withOutputTags(rawDataTag, TupleTagList.of(exceptionTag))
>         );
>         p.run().waitUntilFinish();
>    }
> }
>
> in the privious code, the code that be commented with ① can be able to run
> normally,but ②,i can't get anything.
>
> here is my beam version
> <dependency>
>     <groupId>org.apache.beam</groupId>
>     <artifactId>beam-sdks-java-core</artifactId>
>     <version>0.7.0-SNAPSHOT</version>
> </dependency>
> <dependency>
>     <groupId>org.apache.beam</groupId>
>     <artifactId>beam-runners-direct-java</artifactId>
>     <version>0.7.0-SNAPSHOT</version>
>     <scope>runtime</scope>
> </dependency>
> <dependency>
>     <groupId>org.apache.beam</groupId>
>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>     <version>0.7.0-SNAPSHOT</version>
> </dependency>
> <dependency>
>      <groupId>org.apache.beam</groupId>
>      <artifactId>beam-runners-spark</artifactId>
>      <version>0.7.0-SNAPSHOT</version>
> </dependency>
>
>
> someone please help me.
>
>
>
> Sent from Mailbird
> <http://www.getmailbird.com/?utm_source=Mailbird&utm_medium=email&utm_campaign=sent-from-mailbird>
>
> On 2017/4/28 4:43:23, Aviem Zur <av...@gmail.com> wrote:
> Yes. Spark streaming support is still experimental and this issue exists
> in Beam 0.6.0
>
> This has since been fixed and the fix will be a part of the upcoming
> release.
>
> Since this isn't the first time a user has encountered this I've created a
> JIRA ticket for better visibility for this issue:
> https://issues.apache.org/jira/browse/BEAM-2106
>
> Thanks for reaching out! Please feel fry to try out your pipeline using
> Beam master branch or one of the nightly SNAPSHOT builds.
>
> On Thu, Apr 27, 2017 at 9:58 AM 4498237@qq <44...@qq.com> wrote:
>
>> Here is my maven configuration, thank you.
>>
>> <dependency>
>>   <groupId>org.apache.beam</groupId>
>>   <artifactId>beam-sdks-java-core</artifactId>
>>   <version>0.6.0</version>
>> </dependency>
>> <dependency>
>>   <groupId>org.apache.beam</groupId>
>>   <artifactId>beam-runners-direct-java</artifactId>
>>   <version>0.6.0</version>
>>   <scope>runtime</scope>
>> </dependency>
>> <dependency>
>>     <groupId>org.apache.beam</groupId>
>>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>>     <version>0.6.0</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.beam</groupId>
>> <artifactId>beam-runners-spark</artifactId>
>> <version>0.6.0</version>
>> </dependency>
>>
>>
>> On 26 Apr 2017, at 6:58 PM, Aviem Zur <av...@gmail.com> wrote:
>>
>> Hi,
>>
>> Can you please share which version of Beam you are using?
>>
>> On Wed, Apr 26, 2017 at 1:51 PM 4498237@qq <44...@qq.com> wrote:
>>
>>> hi, here is my program that about additional outputs for Apache Beam
>>>  and  the result :
>>> public class DataExtraction2 {
>>>     public static void main(String[] args) {
>>>         System.setProperty("hadoop.home.dir", "C://hadoop/hadoop-2.6.1
>>> ");
>>>         SparkPipelineOptions options =
>>> PipelineOptionsFactory.as(SparkPipelineOptions.class);
>>>         options.setSparkMaster("local[4]");
>>> //        options.setCheckpointDir("./checkpoint");
>>>         options.setRunner(SparkRunner.class);
>>> //        options.setRunner(DirectRunner.class);
>>>         options.setStorageLevel("MEMORY_ONLY");
>>>         options.setAppName("testMavenDependency");
>>>         options.setBatchIntervalMillis(1000L);
>>>         options.setEnableSparkMetricSinks(false);
>>>         Pipeline p = Pipeline.create(options);
>>>         List<String> topics =
>>> Arrays.asList("beamOnSparkTest".split(","));
>>>
>>>         final TupleTag<String> rawDataTag = new TupleTag<String>() {
>>>         };
>>>
>>>         final TupleTag<String> exceptionTag = new TupleTag<String>() {
>>>         };
>>>         final TupleTag<String> riskEventLogTag = new TupleTag<String>() {
>>>         };
>>>         final TupleTag<String> statisticsTag = new TupleTag<String>() {
>>>         };
>>>         final TupleTag<String> errorTargetLogTag = new
>>> TupleTag<String>() {
>>>         };
>>>         final TupleTag<String> equipmentLogTag = new TupleTag<String>() {
>>>         };
>>>         final TupleTag<String> performanceLogTag = new
>>> TupleTag<String>() {
>>>         };
>>>         PCollection<String> rawData = p.apply(KafkaIO.<Void,
>>> String>read()
>>>                 .withBootstrapServers("172.17.1.138:9092,
>>> 172.17.1.137:9092")
>>>                 .withTopics(topics)
>>>                 .withConsumerFactoryFn(new CafintechConsumerFactoryFn())
>>>                 .withKeyCoder(VoidCoder.of())
>>>                 .withValueCoder(StringUtf8Coder.of())
>>>                 .withoutMetadata()
>>>         ).apply(Values.<String>create());
>>>         PCollectionTuple results = rawData.apply(
>>>                 ParDo.withOutputTags(rawDataTag,
>>>                         TupleTagList.of(exceptionTag)
>>>                                 .and(riskEventLogTag)
>>>                                 .and(statisticsTag)
>>>                                 .and(errorTargetLogTag)
>>>                                 .and(equipmentLogTag)
>>>                                 .and(performanceLogTag))
>>>                         .of(new DoFn<String, String>() {
>>>                             @ProcessElement
>>>                             public void processElement(ProcessContext c)
>>> {
>>>                                 String idCoop = "";
>>>                                 int eventType = 0;
>>>                                 int osPlatformType = -1;
>>>                                 String innerDecision = "";
>>>                                 String outterDecision = "";
>>>                                 // Date appTime = new Date();
>>>                                 String eventId = "";
>>>                                 //String strategyList = "";
>>>                                 String uuid = "";
>>>                                 String phoneNo = "";
>>>                                 int equipmentType = -1;
>>>                                 int antiFraudTime = -1;
>>>                                 ......
>>>                             }
>>>                         }));
>>>         p.run().waitUntilFinish();
>>>     }
>>> }
>>> when i run this program, i get result:
>>> .....
>>> ....
>>> 2017-04-26 15:06:13,077 [pool-1-thread-1]
>>> [org.apache.spark.streaming.StreamingContext] [ERROR] - Error starting the
>>> context, marking it as stopped
>>> java.io.NotSerializableException: DStream checkpointing has been enabled
>>> but the DStreams with their functions are not serializable
>>> org.apache.beam.runners.spark.translation.EvaluationContext
>>> Serialization stack:
>>> - object not serializable (class:
>>> org.apache.beam.runners.spark.translation.EvaluationContext, value:
>>> org.apache.beam.runners.spark.translation.EvaluationContext@2807813e)
>>> - field (class:
>>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
>>> name: val$context, type: class
>>> org.apache.beam.runners.spark.translation.EvaluationContext)
>>> - object (class
>>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
>>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@560cd8a8
>>> )
>>> - field (class:
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>>> name: transformFunc$3, type: interface
>>> org.apache.spark.api.java.function.Function)
>>> - object (class
>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1,
>>> <function1>)
>>> - field (class:
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>>> name: cleanedF$2, type: interface scala.Function1)
>>> - object (class
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
>>> <function2>)
>>> - field (class:
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5,
>>> name: cleanedF$3, type: interface scala.Function2)
>>> - object (class
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5,
>>> <function2>)
>>> - field (class: org.apache.spark.streaming.dstream.TransformedDStream,
>>> name: transformFunc, type: interface scala.Function2)
>>> - object (class org.apache.spark.streaming.dstream.TransformedDStream,
>>> org.apache.spark.streaming.dstream.TransformedDStream@3ea9e1e5)
>>> - writeObject data (class:
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>>> - object (class
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
>>> 0 checkpoint files
>>> ])
>>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>>> - object (class
>>> org.apache.spark.streaming.dstream.InternalMapWithStateDStream,
>>> org.apache.spark.streaming.dstream.InternalMapWithStateDStream@23ab764d)
>>> - writeObject data (class:
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>>> - object (class
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
>>> 0 checkpoint files
>>> ])
>>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>>> - object (class org.apache.spark.streaming.dstream.FilteredDStream,
>>> org.apache.spark.streaming.dstream.FilteredDStream@5bbb0240)
>>> - writeObject data (class:
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>>> - object (class
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
>>> 0 checkpoint files
>>> ])
>>> - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
>>> - object (class
>>> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl,
>>> org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@24211bca)
>>> - writeObject data (class:
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData)
>>> - object (class
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData, [
>>> 0 checkpoint files
>>> ...
>>> ....
>>>
>>> if only one main output, program works OK
>>> can you tell me why?
>>>
>>
>>