You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 17:20:12 UTC

[GitHub] [beam] kennknowles opened a new issue, #18305: can't use window in spark cluster module

kennknowles opened a new issue, #18305:
URL: https://github.com/apache/beam/issues/18305

    I user beam in spark cluster,The application is blow.
   SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
           options.setRunner(SparkRunner.class);
           options.setEnableSparkMetricSinks(false);
           options.setStreaming(true);
           options.setSparkMaster("spark://10.100.124.205:6066");
           options.setAppName("Beam App Spark"****new Random().nextFloat());
           options.setJobName("Beam Job Spark"****new Random().nextFloat());
           System.out.println("App Name:"****options.getAppName());
           System.out.println("Job Name:"****options.getJobName());
           options.setMaxRecordsPerBatch(100000L);
           
   //      PipelineOptions options = PipelineOptionsFactory.create();
           Pipeline p = Pipeline.create(options);
           
   //      Duration size = Duration.standardMinutes(4);
           long duration = 60;
           if(args!=null && args.length\==1){
               duration = Integer.valueOf(args[0]);
           }
           Duration size = Duration.standardSeconds(duration);
           System.out.println("时间窗口为:["****duration****"]秒");
           Window.Bound<KV<String,String\>\> fixWindow = Window.<KV<String,String\>\> into(
               FixedWindows.of(size)
           );
           
           String kafkaAddress = "10.100.124.208:9093";
   //      String kafkaAddress = "192.168.100.212:9092";
           
           Map<String, Object\> kfConsunmerConf = new HashMap<String, Object\>();
           kfConsunmerConf.put("auto.offset.reset", "latest");
           PCollection<String\> kafkaJsonPc = p.apply(KafkaIO.<String, String\> read()
               .withBootstrapServers(kafkaAddress)
               .withTopics(ImmutableList.of("wypxx1"))
               .withKeyCoder(StringUtf8Coder.of()) 
               .withValueCoder(StringUtf8Coder.of())
               .updateConsumerProperties(kfConsunmerConf)
               .withoutMetadata() 
           ).apply(Values.<String\> create());
           
           
           PCollection<KV<String,String\>\> totalPc = kafkaJsonPc.apply(
                   "count line",
                   ParDo.of(new DoFn<String,KV<String,String\>\>() {
                       @ProcessElement
                         public void processElement(ProcessContext c) {
                           String line = c.element();
                           Instant is = c.timestamp();
                           if(line.length()\>2)
                             line = line.substring(0,2);
                           System.out.println(line **** " " ****  is.toString());
                           c.output(KV.of(line, line));
                         }
                    })
               );
               
           
           PCollection<KV<String, Iterable<String\>\>\> itPc = totalPc.apply(fixWindow).apply(
                   "group by appKey",
                   GroupByKey.<String, String\>create()
               );
             itPc.apply(ParDo.of(new DoFn<KV<String, Iterable<String\>\>, Void\>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                       KV<String, Iterable<String\>\> keyIt = c.element();
                       String key = keyIt.getKey();
                       Iterable<String\> itb = keyIt.getValue();
                       Iterator<String\> it = itb.iterator();
                       StringBuilder sb = new StringBuilder();
                       sb.append(key).append(":[");
                       while(it.hasNext()){
                           sb.append(it.next()).append(",");
                       }
                       String str = sb.toString();
   
                       str = str.substring(0,str.length() -1) **** "]";
                       System.out.println(str);
                       String filePath = "/data/wyp/sparktest.txt";
                       String line = "word\--\>["****key****"]total count="****str****"\---\>time****"****c.timestamp().toString();
                       System.out.println("writefile\--\---\>"****line);
                       FileUtil.write(filePath, line, true, true);
                   }
                   
                }));
             
               p.run().waitUntilFinish();
   
   When I user submit application to spark cluster.In spark UI,I can see log of  totalPc PCollection  of. after one miniter but I can.t see log of itPc PCollection.
   I use local mode spark,It work well.
   Please help me to resovle this proplems.Thanks!
   
   
   Imported from Jira [BEAM-1789](https://issues.apache.org/jira/browse/BEAM-1789). Original Jira may contain additional context.
   Reported by: tianyou.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org