You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "tianyou (JIRA)" <ji...@apache.org> on 2017/04/12 06:55:41 UTC

[jira] [Commented] (BEAM-1789) window can't not use in spark cluster module

    [ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15965466#comment-15965466 ] 

tianyou commented on BEAM-1789:
-------------------------------

[~amitsela] Hello,  Do you test the above code in cluster mode of spark?I test it in some way(spark-submit or debug in eclipse).and then it does‘t  work well.

> window can't not use in spark cluster module
> --------------------------------------------
>
>                 Key: BEAM-1789
>                 URL: https://issues.apache.org/jira/browse/BEAM-1789
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: tianyou
>            Assignee: Amit Sela
>
>  I user beam in spark cluster,The application is blow.
> SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
>         options.setRunner(SparkRunner.class);
>         options.setEnableSparkMetricSinks(false);
>         options.setStreaming(true);
>         options.setSparkMaster("spark://10.100.124.205:6066");
>         options.setAppName("Beam App Spark"+new Random().nextFloat());
>         options.setJobName("Beam Job Spark"+new Random().nextFloat());
>         System.out.println("App Name:"+options.getAppName());
>         System.out.println("Job Name:"+options.getJobName());
>         options.setMaxRecordsPerBatch(100000L);
>         
> //      PipelineOptions options = PipelineOptionsFactory.create();
>         Pipeline p = Pipeline.create(options);
>         
> //      Duration size = Duration.standardMinutes(4);
>         long duration = 60;
>         if(args!=null && args.length==1){
>             duration = Integer.valueOf(args[0]);
>         }
>         Duration size = Duration.standardSeconds(duration);
>         System.out.println("时间窗口为:["+duration+"]秒");
>         Window.Bound<KV<String,String>> fixWindow = Window.<KV<String,String>> into(
>             FixedWindows.of(size)
>         );
>         
>         String kafkaAddress = "10.100.124.208:9093";
> //      String kafkaAddress = "192.168.100.212:9092";
>         
>         Map<String, Object> kfConsunmerConf = new HashMap<String, Object>();
>         kfConsunmerConf.put("auto.offset.reset", "latest");
>         PCollection<String> kafkaJsonPc = p.apply(KafkaIO.<String, String> read()
>             .withBootstrapServers(kafkaAddress)
>             .withTopics(ImmutableList.of("wypxx1"))
>             .withKeyCoder(StringUtf8Coder.of()) 
>             .withValueCoder(StringUtf8Coder.of())
>             .updateConsumerProperties(kfConsunmerConf)
>             .withoutMetadata() 
>         ).apply(Values.<String> create());
>         
>         
>         PCollection<KV<String,String>> totalPc = kafkaJsonPc.apply(
>                 "count line",
>                 ParDo.of(new DoFn<String,KV<String,String>>() {
>                     @ProcessElement
>                       public void processElement(ProcessContext c) {
>                         String line = c.element();
>                         Instant is = c.timestamp();
>                         if(line.length()>2)
>                           line = line.substring(0,2);
>                         System.out.println(line + " " +  is.toString());
>                         c.output(KV.of(line, line));
>                       }
>                  })
>             );
>             
>         
>         PCollection<KV<String, Iterable<String>>> itPc = totalPc.apply(fixWindow).apply(
>                 "group by appKey",
>                 GroupByKey.<String, String>create()
>             );
>           itPc.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     KV<String, Iterable<String>> keyIt = c.element();
>                     String key = keyIt.getKey();
>                     Iterable<String> itb = keyIt.getValue();
>                     Iterator<String> it = itb.iterator();
>                     StringBuilder sb = new StringBuilder();
>                     sb.append(key).append(":[");
>                     while(it.hasNext()){
>                         sb.append(it.next()).append(",");
>                     }
>                     String str = sb.toString();
>                     str = str.substring(0,str.length() -1) + "]";
>                     System.out.println(str);
>                     String filePath = "/data/wyp/sparktest.txt";
>                     String line = "word-->["+key+"]total count="+str+"--->time+"+c.timestamp().toString();
>                     System.out.println("writefile----->"+line);
>                     FileUtil.write(filePath, line, true, true);
>                 }
>                 
>              }));
>           
>             p.run().waitUntilFinish();
> When I user submit application to spark cluster.In spark UI,I can see log of  totalPc PCollection  of. after one miniter but I can.t see log of itPc PCollection.
> I use local mode spark,It work well.
> Please help me to resovle this proplems.Thanks!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)