You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mahender Devaruppala <ma...@apporchid.com> on 2017/08/30 21:45:22 UTC

Apache Beam v2.1.0 - Spark Runner Issue

Hello,

I am running into spark assertion error when running a apache pipeline and below are the details:

Apache Beam version: 2.1.0
Spark version: 2.1.0

Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
               at scala.Predef$.assert(Predef.scala:179)
               at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

Can you please let me know if Apache beam v2.1.0 Spark runner is compatible to work with Spark v2.1.0?

Below is the code snippet for the pipeline:

       PipelineOptionsFactory.register(CSVOptions.class);
             CSVOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CSVOptions.class);
             options.setRunner(SparkRunner.class);
             options.setSparkMaster("local[4]");
             options.setEnableSparkMetricSinks(false);
             Pipeline p = Pipeline.create(options);
             p.apply("ReadMyCSVFile", TextIO.read().from(URIUtil.getFromPath(options.getInputFile())))
             .apply(new DataLoader())
             .apply(JdbcIO.<String>write().withDataSourceConfiguration
       (JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver","jdbc:postgresql://localhost:5432/beam")
                        .withUsername("postgres").withPassword("postgres")).withStatement("insert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
                           .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<String>() {
                                 public void setParameters(String element, PreparedStatement query) throws SQLException {
                                        String[] datas = element.split("\t");
                                        if(datas.length >0) {
                                               for(int j=0 ; j<datas.length;j++){
                                                     query.setString(j+1, datas[j]);
                                               }
                                        }

                                 }
               }));
             SparkRunner runner = SparkRunner.create(options);
             runner.run(p).waitUntilFinish();


Any help would be greatly appreciated.

Thanks,
Mahender


RE: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Mahender Devaruppala <ma...@apporchid.com>.
Thanks very much JB, will look forward to your note. 

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net] 
Sent: Friday, September 1, 2017 1:54 AM
To: user@beam.apache.org
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Sure, I will send the PR during the weekend. I will let you know.

Regards
JB

On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> Sent: Thursday, August 31, 2017 12:11 AM
> To: user@beam.apache.org
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Hi,
> 
> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> 
> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
> 
> Regards
> JB
> 
> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>> Hello,
>>
>> I am running into spark assertion error when running a apache 
>> pipeline and below are the details:
>>
>> Apache Beam version: 2.1.0
>>
>> Spark version: 2.1.0
>>
>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset 
>> must return a zero value copy
>>
>>                  at scala.Predef$.assert(Predef.scala:179)
>>
>>                  at
>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>> 1
>> 62)
>>
>>                  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>>
>>                  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>>
>>                  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>
>> Can you please let me know if Apache beam v2.1.0 Spark runner is 
>> compatible to work with Spark v2.1.0?
>>
>> Below is the code snippet for the pipeline:
>>
>>          PipelineOptionsFactory./register/(CSVOptions.*class*);
>>
>>                CSVOptions options=
>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOption
>> s
>> .*class*);
>>
>> options.setRunner(SparkRunner.*class*);
>>
>> options.setSparkMaster("local[4]");
>>
>> options.setEnableSparkMetricSinks(*false*);
>>
>>                Pipeline p= Pipeline./create/(options);
>>
>> p.apply("ReadMyCSVFile",
>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>
>>                .apply(*new*DataLoader())
>>
>>                
>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>
>>          
>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdb
>> c
>> :postgresql://localhost:5432/beam")
>>
>>                           
>> .withUsername("postgres").withPassword("postgres")).withStatement("in
>> s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>
>>                              
>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Str
>> i
>> ng>()_ {
>>
>> *public**void*setParameters(String element, PreparedStatement query) 
>> *throws*SQLException {
>>
>>                                           String[] datas= 
>> element.split("\t");
>>
>> *if*(datas.length>0) {
>>
>> *for*(*int*j=0 ; j<datas.length;j++){
>>
>> query.setString(j+1, datas[j]);
>>
>>                                                  }
>>
>>                                           }
>>
>> }
>>
>>                  }));
>>
>>                SparkRunner runner= SparkRunner./create/(options);
>>
>> runner.run(p).waitUntilFinish();
>>
>> Any help would be greatly appreciated.
>>
>> Thanks,
>>
>> Mahender
>>
> 
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Thanks for the update. I'm also doing the change (I'm refactoring RedisIO right 
now, I will go back on Spark 2 runner just after).

Regards
JB

On 09/15/2017 04:16 PM, sureshkumarvepari@gmail.com wrote:
> 
> 
> On 2017-09-11 09:53, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>> I'm planning to resume my work on it. About the Accumulator, I think it's OK,
>> but I will double check.
>>
>> I will keep you posted.
>>
>> Regards
>> JB
>>
>> On 09/10/2017 08:48 AM, sureshkumarvepari@gmail.com wrote:
>>>
>>>
>>> On 2017-09-08 11:39, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>>>> Hmmm weird, let me check.
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 09/07/2017 09:03 PM, sureshkumarvepari@gmail.com wrote:
>>>>>
>>>>>
>>>>> On 2017-09-05 19:01, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>>>>>> Hi guys
>>>>>>
>>>>>> I created PR #3808:
>>>>>>
>>>>>> https://github.com/apache/beam/pull/3808
>>>>>>
>>>>>> It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).
>>>>>>
>>>>>> I have some questions (more for dev) that I let in a comment.
>>>>>>
>>>>>> It's still a work in progress as I have to fix unit tests and discuss about the
>>>>>> validate runner test. However you can already take a look.
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
>>>>>>> Sure, thanks very much JB, will look forward to your link.
>>>>>>>
>>>>>>> -----Original Message-----
>>>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>>>> Sent: Monday, September 4, 2017 8:59 AM
>>>>>>> To: user@beam.apache.org
>>>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Actually, I'm preparing the PR. I will send the PR link to you.
>>>>>>>
>>>>>>> Give me just some time to rebase my branch and push.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
>>>>>>>> Hi JB,
>>>>>>>>
>>>>>>>> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Mahender
>>>>>>>>
>>>>>>>> -----Original Message-----
>>>>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>>>>> Sent: Friday, September 1, 2017 1:54 AM
>>>>>>>> To: user@beam.apache.org
>>>>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>>>>
>>>>>>>> Sure, I will send the PR during the weekend. I will let you know.
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> JB
>>>>>>>>
>>>>>>>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>>>>>>>>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
>>>>>>>>>
>>>>>>>>> -----Original Message-----
>>>>>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>>>>>> Sent: Thursday, August 31, 2017 12:11 AM
>>>>>>>>> To: user@beam.apache.org
>>>>>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>>>>>>>>
>>>>>>>>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>>
>>>>>>>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I am running into spark assertion error when running a apache
>>>>>>>>>> pipeline and below are the details:
>>>>>>>>>>
>>>>>>>>>> Apache Beam version: 2.1.0
>>>>>>>>>>
>>>>>>>>>> Spark version: 2.1.0
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
>>>>>>>>>> must return a zero value copy
>>>>>>>>>>
>>>>>>>>>>                       at scala.Predef$.assert(Predef.scala:179)
>>>>>>>>>>
>>>>>>>>>>                       at
>>>>>>>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>>>>>>>>> 1
>>>>>>>>>> 62)
>>>>>>>>>>
>>>>>>>>>>                       at
>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>>>>> Method)
>>>>>>>>>>
>>>>>>>>>>                       at
>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>>>>>>>>> Source)
>>>>>>>>>>
>>>>>>>>>>                       at
>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>>>>>>>>
>>>>>>>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
>>>>>>>>>> compatible to work with Spark v2.1.0?
>>>>>>>>>>
>>>>>>>>>> Below is the code snippet for the pipeline:
>>>>>>>>>>
>>>>>>>>>>               PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>>>>>>>>
>>>>>>>>>>                     CSVOptions options=
>>>>>>>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
>>>>>>>>>> n
>>>>>>>>>> s
>>>>>>>>>> .*class*);
>>>>>>>>>>
>>>>>>>>>> options.setRunner(SparkRunner.*class*);
>>>>>>>>>>
>>>>>>>>>> options.setSparkMaster("local[4]");
>>>>>>>>>>
>>>>>>>>>> options.setEnableSparkMetricSinks(*false*);
>>>>>>>>>>
>>>>>>>>>>                     Pipeline p= Pipeline./create/(options);
>>>>>>>>>>
>>>>>>>>>> p.apply("ReadMyCSVFile",
>>>>>>>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>>>>>>>>
>>>>>>>>>>                     .apply(*new*DataLoader())
>>>>>>>>>>
>>>>>>>>>>                     
>>>>>>>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>>>>>>>>
>>>>>>>>>>               
>>>>>>>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
>>>>>>>>>> b
>>>>>>>>>> c
>>>>>>>>>> :postgresql://localhost:5432/beam")
>>>>>>>>>>
>>>>>>>>>>                                
>>>>>>>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
>>>>>>>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>>>>>>>>
>>>>>>>>>>                                   
>>>>>>>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
>>>>>>>>>> r
>>>>>>>>>> i
>>>>>>>>>> ng>()_ {
>>>>>>>>>>
>>>>>>>>>> *public**void*setParameters(String element, PreparedStatement query)
>>>>>>>>>> *throws*SQLException {
>>>>>>>>>>
>>>>>>>>>>                                                String[] datas=
>>>>>>>>>> element.split("\t");
>>>>>>>>>>
>>>>>>>>>> *if*(datas.length>0) {
>>>>>>>>>>
>>>>>>>>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>>>>>>>>
>>>>>>>>>> query.setString(j+1, datas[j]);
>>>>>>>>>>
>>>>>>>>>>                                                       }
>>>>>>>>>>
>>>>>>>>>>                                                }
>>>>>>>>>>
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>                       }));
>>>>>>>>>>
>>>>>>>>>>                     SparkRunner runner= SparkRunner./create/(options);
>>>>>>>>>>
>>>>>>>>>> runner.run(p).waitUntilFinish();
>>>>>>>>>>
>>>>>>>>>> Any help would be greatly appreciated.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Mahender
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>> jbonofre@apache.org
>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jean-Baptiste Onofré
>>>>>>>> jbonofre@apache.org
>>>>>>>> http://blog.nanthrax.net
>>>>>>>> Talend - http://www.talend.com
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbonofre@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>
>>>>>> -- 
>>>>>> Jean-Baptiste Onofré
>>>>>> jbonofre@apache.org
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>> Hi JB,
>>>>> I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
>>>>> Does the change from Iterables to iterators is supposed to fix this issue or missing something else . ?
>>>>> please confirm.
>>>>>
>>>>> Thanks in Advance.
>>>>> Suri
>>>>>
>>>>
>>>> -- 
>>>> Jean-Baptiste Onofré
>>>> jbonofre@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>> Hi JB,
>>> like discussed here https://stackoverflow.com/questions/42336251/assertionerror-assertion-failed-copyandreset-must-return-a-zero-value-copy and https://github.com/bigdatagenomics/adam/issues/1021#issuecomment-216283222
>>>
>>> Does spark-core needs to be changed ..?  can you please confirm.
>>>
>>> Thanks in advance,
>>> Suri
>>>
>>
>> -- 
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
> Hi JB,
> with your changes from PR #3808 https://github.com/apache/beam/pull/3808 worked with above discussed AccumulatorV2 changes in spark-core (i.e avoiding that assertion) .
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by "sureshkumarvepari@gmail.com" <su...@gmail.com>.

On 2017-09-11 09:53, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote: 
> I'm planning to resume my work on it. About the Accumulator, I think it's OK, 
> but I will double check.
> 
> I will keep you posted.
> 
> Regards
> JB
> 
> On 09/10/2017 08:48 AM, sureshkumarvepari@gmail.com wrote:
> > 
> > 
> > On 2017-09-08 11:39, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
> >> Hmmm weird, let me check.
> >>
> >> Regards
> >> JB
> >>
> >> On 09/07/2017 09:03 PM, sureshkumarvepari@gmail.com wrote:
> >>>
> >>>
> >>> On 2017-09-05 19:01, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
> >>>> Hi guys
> >>>>
> >>>> I created PR #3808:
> >>>>
> >>>> https://github.com/apache/beam/pull/3808
> >>>>
> >>>> It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).
> >>>>
> >>>> I have some questions (more for dev) that I let in a comment.
> >>>>
> >>>> It's still a work in progress as I have to fix unit tests and discuss about the
> >>>> validate runner test. However you can already take a look.
> >>>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
> >>>>> Sure, thanks very much JB, will look forward to your link.
> >>>>>
> >>>>> -----Original Message-----
> >>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >>>>> Sent: Monday, September 4, 2017 8:59 AM
> >>>>> To: user@beam.apache.org
> >>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> Actually, I'm preparing the PR. I will send the PR link to you.
> >>>>>
> >>>>> Give me just some time to rebase my branch and push.
> >>>>>
> >>>>> Thanks,
> >>>>> Regards
> >>>>> JB
> >>>>>
> >>>>> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
> >>>>>> Hi JB,
> >>>>>>
> >>>>>> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Mahender
> >>>>>>
> >>>>>> -----Original Message-----
> >>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >>>>>> Sent: Friday, September 1, 2017 1:54 AM
> >>>>>> To: user@beam.apache.org
> >>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>>>>
> >>>>>> Sure, I will send the PR during the weekend. I will let you know.
> >>>>>>
> >>>>>> Regards
> >>>>>> JB
> >>>>>>
> >>>>>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> >>>>>>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
> >>>>>>>
> >>>>>>> -----Original Message-----
> >>>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >>>>>>> Sent: Thursday, August 31, 2017 12:11 AM
> >>>>>>> To: user@beam.apache.org
> >>>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> >>>>>>>
> >>>>>>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>> JB
> >>>>>>>
> >>>>>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
> >>>>>>>> Hello,
> >>>>>>>>
> >>>>>>>> I am running into spark assertion error when running a apache
> >>>>>>>> pipeline and below are the details:
> >>>>>>>>
> >>>>>>>> Apache Beam version: 2.1.0
> >>>>>>>>
> >>>>>>>> Spark version: 2.1.0
> >>>>>>>>
> >>>>>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
> >>>>>>>> must return a zero value copy
> >>>>>>>>
> >>>>>>>>                      at scala.Predef$.assert(Predef.scala:179)
> >>>>>>>>
> >>>>>>>>                      at
> >>>>>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
> >>>>>>>> 1
> >>>>>>>> 62)
> >>>>>>>>
> >>>>>>>>                      at
> >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >>>>>>>> Method)
> >>>>>>>>
> >>>>>>>>                      at
> >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
> >>>>>>>> Source)
> >>>>>>>>
> >>>>>>>>                      at
> >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> >>>>>>>>
> >>>>>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
> >>>>>>>> compatible to work with Spark v2.1.0?
> >>>>>>>>
> >>>>>>>> Below is the code snippet for the pipeline:
> >>>>>>>>
> >>>>>>>>              PipelineOptionsFactory./register/(CSVOptions.*class*);
> >>>>>>>>
> >>>>>>>>                    CSVOptions options=
> >>>>>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
> >>>>>>>> n
> >>>>>>>> s
> >>>>>>>> .*class*);
> >>>>>>>>
> >>>>>>>> options.setRunner(SparkRunner.*class*);
> >>>>>>>>
> >>>>>>>> options.setSparkMaster("local[4]");
> >>>>>>>>
> >>>>>>>> options.setEnableSparkMetricSinks(*false*);
> >>>>>>>>
> >>>>>>>>                    Pipeline p= Pipeline./create/(options);
> >>>>>>>>
> >>>>>>>> p.apply("ReadMyCSVFile",
> >>>>>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
> >>>>>>>>
> >>>>>>>>                    .apply(*new*DataLoader())
> >>>>>>>>
> >>>>>>>>                    
> >>>>>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
> >>>>>>>>
> >>>>>>>>              
> >>>>>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
> >>>>>>>> b
> >>>>>>>> c
> >>>>>>>> :postgresql://localhost:5432/beam")
> >>>>>>>>
> >>>>>>>>                               
> >>>>>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
> >>>>>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
> >>>>>>>>
> >>>>>>>>                                  
> >>>>>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
> >>>>>>>> r
> >>>>>>>> i
> >>>>>>>> ng>()_ {
> >>>>>>>>
> >>>>>>>> *public**void*setParameters(String element, PreparedStatement query)
> >>>>>>>> *throws*SQLException {
> >>>>>>>>
> >>>>>>>>                                               String[] datas=
> >>>>>>>> element.split("\t");
> >>>>>>>>
> >>>>>>>> *if*(datas.length>0) {
> >>>>>>>>
> >>>>>>>> *for*(*int*j=0 ; j<datas.length;j++){
> >>>>>>>>
> >>>>>>>> query.setString(j+1, datas[j]);
> >>>>>>>>
> >>>>>>>>                                                      }
> >>>>>>>>
> >>>>>>>>                                               }
> >>>>>>>>
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>>                      }));
> >>>>>>>>
> >>>>>>>>                    SparkRunner runner= SparkRunner./create/(options);
> >>>>>>>>
> >>>>>>>> runner.run(p).waitUntilFinish();
> >>>>>>>>
> >>>>>>>> Any help would be greatly appreciated.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> Mahender
> >>>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Jean-Baptiste Onofré
> >>>>>>> jbonofre@apache.org
> >>>>>>> http://blog.nanthrax.net
> >>>>>>> Talend - http://www.talend.com
> >>>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Jean-Baptiste Onofré
> >>>>>> jbonofre@apache.org
> >>>>>> http://blog.nanthrax.net
> >>>>>> Talend - http://www.talend.com
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Jean-Baptiste Onofré
> >>>>> jbonofre@apache.org
> >>>>> http://blog.nanthrax.net
> >>>>> Talend - http://www.talend.com
> >>>>>
> >>>>
> >>>> -- 
> >>>> Jean-Baptiste Onofré
> >>>> jbonofre@apache.org
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >>>> Hi JB,
> >>> I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
> >>> Does the change from Iterables to iterators is supposed to fix this issue or missing something else . ?
> >>> please confirm.
> >>>
> >>> Thanks in Advance.
> >>> Suri
> >>>
> >>
> >> -- 
> >> Jean-Baptiste Onofré
> >> jbonofre@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >> Hi JB,
> > like discussed here https://stackoverflow.com/questions/42336251/assertionerror-assertion-failed-copyandreset-must-return-a-zero-value-copy and https://github.com/bigdatagenomics/adam/issues/1021#issuecomment-216283222
> > 
> > Does spark-core needs to be changed ..?  can you please confirm.
> > 
> > Thanks in advance,
> > Suri
> > 
> 
> -- 
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 
Hi JB, 
with your changes from PR #3808 https://github.com/apache/beam/pull/3808 worked with above discussed AccumulatorV2 changes in spark-core (i.e avoiding that assertion) .

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
I'm planning to resume my work on it. About the Accumulator, I think it's OK, 
but I will double check.

I will keep you posted.

Regards
JB

On 09/10/2017 08:48 AM, sureshkumarvepari@gmail.com wrote:
> 
> 
> On 2017-09-08 11:39, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>> Hmmm weird, let me check.
>>
>> Regards
>> JB
>>
>> On 09/07/2017 09:03 PM, sureshkumarvepari@gmail.com wrote:
>>>
>>>
>>> On 2017-09-05 19:01, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>>>> Hi guys
>>>>
>>>> I created PR #3808:
>>>>
>>>> https://github.com/apache/beam/pull/3808
>>>>
>>>> It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).
>>>>
>>>> I have some questions (more for dev) that I let in a comment.
>>>>
>>>> It's still a work in progress as I have to fix unit tests and discuss about the
>>>> validate runner test. However you can already take a look.
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
>>>>> Sure, thanks very much JB, will look forward to your link.
>>>>>
>>>>> -----Original Message-----
>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>> Sent: Monday, September 4, 2017 8:59 AM
>>>>> To: user@beam.apache.org
>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>
>>>>> Hi,
>>>>>
>>>>> Actually, I'm preparing the PR. I will send the PR link to you.
>>>>>
>>>>> Give me just some time to rebase my branch and push.
>>>>>
>>>>> Thanks,
>>>>> Regards
>>>>> JB
>>>>>
>>>>> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
>>>>>> Hi JB,
>>>>>>
>>>>>> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
>>>>>>
>>>>>> Thanks,
>>>>>> Mahender
>>>>>>
>>>>>> -----Original Message-----
>>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>>> Sent: Friday, September 1, 2017 1:54 AM
>>>>>> To: user@beam.apache.org
>>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>>
>>>>>> Sure, I will send the PR during the weekend. I will let you know.
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>>>>>>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
>>>>>>>
>>>>>>> -----Original Message-----
>>>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>>>> Sent: Thursday, August 31, 2017 12:11 AM
>>>>>>> To: user@beam.apache.org
>>>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>>>>>>
>>>>>>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
>>>>>>>
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I am running into spark assertion error when running a apache
>>>>>>>> pipeline and below are the details:
>>>>>>>>
>>>>>>>> Apache Beam version: 2.1.0
>>>>>>>>
>>>>>>>> Spark version: 2.1.0
>>>>>>>>
>>>>>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
>>>>>>>> must return a zero value copy
>>>>>>>>
>>>>>>>>                      at scala.Predef$.assert(Predef.scala:179)
>>>>>>>>
>>>>>>>>                      at
>>>>>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>>>>>>> 1
>>>>>>>> 62)
>>>>>>>>
>>>>>>>>                      at
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>>> Method)
>>>>>>>>
>>>>>>>>                      at
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>>>>>>> Source)
>>>>>>>>
>>>>>>>>                      at
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>>>>>>
>>>>>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
>>>>>>>> compatible to work with Spark v2.1.0?
>>>>>>>>
>>>>>>>> Below is the code snippet for the pipeline:
>>>>>>>>
>>>>>>>>              PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>>>>>>
>>>>>>>>                    CSVOptions options=
>>>>>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
>>>>>>>> n
>>>>>>>> s
>>>>>>>> .*class*);
>>>>>>>>
>>>>>>>> options.setRunner(SparkRunner.*class*);
>>>>>>>>
>>>>>>>> options.setSparkMaster("local[4]");
>>>>>>>>
>>>>>>>> options.setEnableSparkMetricSinks(*false*);
>>>>>>>>
>>>>>>>>                    Pipeline p= Pipeline./create/(options);
>>>>>>>>
>>>>>>>> p.apply("ReadMyCSVFile",
>>>>>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>>>>>>
>>>>>>>>                    .apply(*new*DataLoader())
>>>>>>>>
>>>>>>>>                    
>>>>>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>>>>>>
>>>>>>>>              
>>>>>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
>>>>>>>> b
>>>>>>>> c
>>>>>>>> :postgresql://localhost:5432/beam")
>>>>>>>>
>>>>>>>>                               
>>>>>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
>>>>>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>>>>>>
>>>>>>>>                                  
>>>>>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
>>>>>>>> r
>>>>>>>> i
>>>>>>>> ng>()_ {
>>>>>>>>
>>>>>>>> *public**void*setParameters(String element, PreparedStatement query)
>>>>>>>> *throws*SQLException {
>>>>>>>>
>>>>>>>>                                               String[] datas=
>>>>>>>> element.split("\t");
>>>>>>>>
>>>>>>>> *if*(datas.length>0) {
>>>>>>>>
>>>>>>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>>>>>>
>>>>>>>> query.setString(j+1, datas[j]);
>>>>>>>>
>>>>>>>>                                                      }
>>>>>>>>
>>>>>>>>                                               }
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>                      }));
>>>>>>>>
>>>>>>>>                    SparkRunner runner= SparkRunner./create/(options);
>>>>>>>>
>>>>>>>> runner.run(p).waitUntilFinish();
>>>>>>>>
>>>>>>>> Any help would be greatly appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Mahender
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> jbonofre@apache.org
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Jean-Baptiste Onofré
>>>>>> jbonofre@apache.org
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>>
>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> jbonofre@apache.org
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>>
>>>> -- 
>>>> Jean-Baptiste Onofré
>>>> jbonofre@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>> Hi JB,
>>> I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
>>> Does the change from Iterables to iterators is supposed to fix this issue or missing something else . ?
>>> please confirm.
>>>
>>> Thanks in Advance.
>>> Suri
>>>
>>
>> -- 
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>> Hi JB,
> like discussed here https://stackoverflow.com/questions/42336251/assertionerror-assertion-failed-copyandreset-must-return-a-zero-value-copy and https://github.com/bigdatagenomics/adam/issues/1021#issuecomment-216283222
> 
> Does spark-core needs to be changed ..?  can you please confirm.
> 
> Thanks in advance,
> Suri
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by "sureshkumarvepari@gmail.com" <su...@gmail.com>.

On 2017-09-08 11:39, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote: 
> Hmmm weird, let me check.
> 
> Regards
> JB
> 
> On 09/07/2017 09:03 PM, sureshkumarvepari@gmail.com wrote:
> > 
> > 
> > On 2017-09-05 19:01, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
> >> Hi guys
> >>
> >> I created PR #3808:
> >>
> >> https://github.com/apache/beam/pull/3808
> >>
> >> It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).
> >>
> >> I have some questions (more for dev) that I let in a comment.
> >>
> >> It's still a work in progress as I have to fix unit tests and discuss about the
> >> validate runner test. However you can already take a look.
> >>
> >> Regards
> >> JB
> >>
> >> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
> >>> Sure, thanks very much JB, will look forward to your link.
> >>>
> >>> -----Original Message-----
> >>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >>> Sent: Monday, September 4, 2017 8:59 AM
> >>> To: user@beam.apache.org
> >>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>
> >>> Hi,
> >>>
> >>> Actually, I'm preparing the PR. I will send the PR link to you.
> >>>
> >>> Give me just some time to rebase my branch and push.
> >>>
> >>> Thanks,
> >>> Regards
> >>> JB
> >>>
> >>> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
> >>>> Hi JB,
> >>>>
> >>>> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
> >>>>
> >>>> Thanks,
> >>>> Mahender
> >>>>
> >>>> -----Original Message-----
> >>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >>>> Sent: Friday, September 1, 2017 1:54 AM
> >>>> To: user@beam.apache.org
> >>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>>
> >>>> Sure, I will send the PR during the weekend. I will let you know.
> >>>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> >>>>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
> >>>>>
> >>>>> -----Original Message-----
> >>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >>>>> Sent: Thursday, August 31, 2017 12:11 AM
> >>>>> To: user@beam.apache.org
> >>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> >>>>>
> >>>>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
> >>>>>
> >>>>> Regards
> >>>>> JB
> >>>>>
> >>>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
> >>>>>> Hello,
> >>>>>>
> >>>>>> I am running into spark assertion error when running a apache
> >>>>>> pipeline and below are the details:
> >>>>>>
> >>>>>> Apache Beam version: 2.1.0
> >>>>>>
> >>>>>> Spark version: 2.1.0
> >>>>>>
> >>>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
> >>>>>> must return a zero value copy
> >>>>>>
> >>>>>>                     at scala.Predef$.assert(Predef.scala:179)
> >>>>>>
> >>>>>>                     at
> >>>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
> >>>>>> 1
> >>>>>> 62)
> >>>>>>
> >>>>>>                     at
> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >>>>>> Method)
> >>>>>>
> >>>>>>                     at
> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
> >>>>>> Source)
> >>>>>>
> >>>>>>                     at
> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> >>>>>>
> >>>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
> >>>>>> compatible to work with Spark v2.1.0?
> >>>>>>
> >>>>>> Below is the code snippet for the pipeline:
> >>>>>>
> >>>>>>             PipelineOptionsFactory./register/(CSVOptions.*class*);
> >>>>>>
> >>>>>>                   CSVOptions options=
> >>>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
> >>>>>> n
> >>>>>> s
> >>>>>> .*class*);
> >>>>>>
> >>>>>> options.setRunner(SparkRunner.*class*);
> >>>>>>
> >>>>>> options.setSparkMaster("local[4]");
> >>>>>>
> >>>>>> options.setEnableSparkMetricSinks(*false*);
> >>>>>>
> >>>>>>                   Pipeline p= Pipeline./create/(options);
> >>>>>>
> >>>>>> p.apply("ReadMyCSVFile",
> >>>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
> >>>>>>
> >>>>>>                   .apply(*new*DataLoader())
> >>>>>>
> >>>>>>                   
> >>>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
> >>>>>>
> >>>>>>             
> >>>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
> >>>>>> b
> >>>>>> c
> >>>>>> :postgresql://localhost:5432/beam")
> >>>>>>
> >>>>>>                              
> >>>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
> >>>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
> >>>>>>
> >>>>>>                                 
> >>>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
> >>>>>> r
> >>>>>> i
> >>>>>> ng>()_ {
> >>>>>>
> >>>>>> *public**void*setParameters(String element, PreparedStatement query)
> >>>>>> *throws*SQLException {
> >>>>>>
> >>>>>>                                              String[] datas=
> >>>>>> element.split("\t");
> >>>>>>
> >>>>>> *if*(datas.length>0) {
> >>>>>>
> >>>>>> *for*(*int*j=0 ; j<datas.length;j++){
> >>>>>>
> >>>>>> query.setString(j+1, datas[j]);
> >>>>>>
> >>>>>>                                                     }
> >>>>>>
> >>>>>>                                              }
> >>>>>>
> >>>>>> }
> >>>>>>
> >>>>>>                     }));
> >>>>>>
> >>>>>>                   SparkRunner runner= SparkRunner./create/(options);
> >>>>>>
> >>>>>> runner.run(p).waitUntilFinish();
> >>>>>>
> >>>>>> Any help would be greatly appreciated.
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Mahender
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Jean-Baptiste Onofré
> >>>>> jbonofre@apache.org
> >>>>> http://blog.nanthrax.net
> >>>>> Talend - http://www.talend.com
> >>>>>
> >>>>
> >>>> --
> >>>> Jean-Baptiste Onofré
> >>>> jbonofre@apache.org
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >>>>
> >>>
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbonofre@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>
> >> -- 
> >> Jean-Baptiste Onofré
> >> jbonofre@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >> Hi JB,
> > I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
> > Does the change from Iterables to iterators is supposed to fix this issue or missing something else . ?
> > please confirm.
> > 
> > Thanks in Advance.
> > Suri
> > 
> 
> -- 
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> Hi JB, 
like discussed here https://stackoverflow.com/questions/42336251/assertionerror-assertion-failed-copyandreset-must-return-a-zero-value-copy and https://github.com/bigdatagenomics/adam/issues/1021#issuecomment-216283222

Does spark-core needs to be changed ..?  can you please confirm.

Thanks in advance,
Suri


Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hmmm weird, let me check.

Regards
JB

On 09/07/2017 09:03 PM, sureshkumarvepari@gmail.com wrote:
> 
> 
> On 2017-09-05 19:01, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>> Hi guys
>>
>> I created PR #3808:
>>
>> https://github.com/apache/beam/pull/3808
>>
>> It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).
>>
>> I have some questions (more for dev) that I let in a comment.
>>
>> It's still a work in progress as I have to fix unit tests and discuss about the
>> validate runner test. However you can already take a look.
>>
>> Regards
>> JB
>>
>> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
>>> Sure, thanks very much JB, will look forward to your link.
>>>
>>> -----Original Message-----
>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>> Sent: Monday, September 4, 2017 8:59 AM
>>> To: user@beam.apache.org
>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>
>>> Hi,
>>>
>>> Actually, I'm preparing the PR. I will send the PR link to you.
>>>
>>> Give me just some time to rebase my branch and push.
>>>
>>> Thanks,
>>> Regards
>>> JB
>>>
>>> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
>>>> Hi JB,
>>>>
>>>> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
>>>>
>>>> Thanks,
>>>> Mahender
>>>>
>>>> -----Original Message-----
>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>> Sent: Friday, September 1, 2017 1:54 AM
>>>> To: user@beam.apache.org
>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>
>>>> Sure, I will send the PR during the weekend. I will let you know.
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>>>>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
>>>>>
>>>>> -----Original Message-----
>>>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>>>> Sent: Thursday, August 31, 2017 12:11 AM
>>>>> To: user@beam.apache.org
>>>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>>>>
>>>>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
>>>>>
>>>>> Regards
>>>>> JB
>>>>>
>>>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>>>>> Hello,
>>>>>>
>>>>>> I am running into spark assertion error when running a apache
>>>>>> pipeline and below are the details:
>>>>>>
>>>>>> Apache Beam version: 2.1.0
>>>>>>
>>>>>> Spark version: 2.1.0
>>>>>>
>>>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
>>>>>> must return a zero value copy
>>>>>>
>>>>>>                     at scala.Predef$.assert(Predef.scala:179)
>>>>>>
>>>>>>                     at
>>>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>>>>> 1
>>>>>> 62)
>>>>>>
>>>>>>                     at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>> Method)
>>>>>>
>>>>>>                     at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>>>>> Source)
>>>>>>
>>>>>>                     at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>>>>
>>>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
>>>>>> compatible to work with Spark v2.1.0?
>>>>>>
>>>>>> Below is the code snippet for the pipeline:
>>>>>>
>>>>>>             PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>>>>
>>>>>>                   CSVOptions options=
>>>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
>>>>>> n
>>>>>> s
>>>>>> .*class*);
>>>>>>
>>>>>> options.setRunner(SparkRunner.*class*);
>>>>>>
>>>>>> options.setSparkMaster("local[4]");
>>>>>>
>>>>>> options.setEnableSparkMetricSinks(*false*);
>>>>>>
>>>>>>                   Pipeline p= Pipeline./create/(options);
>>>>>>
>>>>>> p.apply("ReadMyCSVFile",
>>>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>>>>
>>>>>>                   .apply(*new*DataLoader())
>>>>>>
>>>>>>                   
>>>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>>>>
>>>>>>             
>>>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
>>>>>> b
>>>>>> c
>>>>>> :postgresql://localhost:5432/beam")
>>>>>>
>>>>>>                              
>>>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
>>>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>>>>
>>>>>>                                 
>>>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
>>>>>> r
>>>>>> i
>>>>>> ng>()_ {
>>>>>>
>>>>>> *public**void*setParameters(String element, PreparedStatement query)
>>>>>> *throws*SQLException {
>>>>>>
>>>>>>                                              String[] datas=
>>>>>> element.split("\t");
>>>>>>
>>>>>> *if*(datas.length>0) {
>>>>>>
>>>>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>>>>
>>>>>> query.setString(j+1, datas[j]);
>>>>>>
>>>>>>                                                     }
>>>>>>
>>>>>>                                              }
>>>>>>
>>>>>> }
>>>>>>
>>>>>>                     }));
>>>>>>
>>>>>>                   SparkRunner runner= SparkRunner./create/(options);
>>>>>>
>>>>>> runner.run(p).waitUntilFinish();
>>>>>>
>>>>>> Any help would be greatly appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mahender
>>>>>>
>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> jbonofre@apache.org
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbonofre@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>> -- 
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>> Hi JB,
> I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
> Does the change from Iterables to iterators is supposed to fix this issue or missing something else . ?
> please confirm.
> 
> Thanks in Advance.
> Suri
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by "sureshkumarvepari@gmail.com" <su...@gmail.com>.

On 2017-09-05 19:01, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote: 
> Hi guys
> 
> I created PR #3808:
> 
> https://github.com/apache/beam/pull/3808
> 
> It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).
> 
> I have some questions (more for dev) that I let in a comment.
> 
> It's still a work in progress as I have to fix unit tests and discuss about the 
> validate runner test. However you can already take a look.
> 
> Regards
> JB
> 
> On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
> > Sure, thanks very much JB, will look forward to your link.
> > 
> > -----Original Message-----
> > From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> > Sent: Monday, September 4, 2017 8:59 AM
> > To: user@beam.apache.org
> > Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> > 
> > Hi,
> > 
> > Actually, I'm preparing the PR. I will send the PR link to you.
> > 
> > Give me just some time to rebase my branch and push.
> > 
> > Thanks,
> > Regards
> > JB
> > 
> > On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
> >> Hi JB,
> >>
> >> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
> >>
> >> Thanks,
> >> Mahender
> >>
> >> -----Original Message-----
> >> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >> Sent: Friday, September 1, 2017 1:54 AM
> >> To: user@beam.apache.org
> >> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>
> >> Sure, I will send the PR during the weekend. I will let you know.
> >>
> >> Regards
> >> JB
> >>
> >> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> >>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
> >>>
> >>> -----Original Message-----
> >>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> >>> Sent: Thursday, August 31, 2017 12:11 AM
> >>> To: user@beam.apache.org
> >>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> >>>
> >>> Hi,
> >>>
> >>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> >>>
> >>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
> >>>> Hello,
> >>>>
> >>>> I am running into spark assertion error when running a apache
> >>>> pipeline and below are the details:
> >>>>
> >>>> Apache Beam version: 2.1.0
> >>>>
> >>>> Spark version: 2.1.0
> >>>>
> >>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
> >>>> must return a zero value copy
> >>>>
> >>>>                    at scala.Predef$.assert(Predef.scala:179)
> >>>>
> >>>>                    at
> >>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
> >>>> 1
> >>>> 62)
> >>>>
> >>>>                    at
> >>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> >>>> Method)
> >>>>
> >>>>                    at
> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
> >>>> Source)
> >>>>
> >>>>                    at
> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> >>>>
> >>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
> >>>> compatible to work with Spark v2.1.0?
> >>>>
> >>>> Below is the code snippet for the pipeline:
> >>>>
> >>>>            PipelineOptionsFactory./register/(CSVOptions.*class*);
> >>>>
> >>>>                  CSVOptions options=
> >>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
> >>>> n
> >>>> s
> >>>> .*class*);
> >>>>
> >>>> options.setRunner(SparkRunner.*class*);
> >>>>
> >>>> options.setSparkMaster("local[4]");
> >>>>
> >>>> options.setEnableSparkMetricSinks(*false*);
> >>>>
> >>>>                  Pipeline p= Pipeline./create/(options);
> >>>>
> >>>> p.apply("ReadMyCSVFile",
> >>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
> >>>>
> >>>>                  .apply(*new*DataLoader())
> >>>>
> >>>>                  
> >>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
> >>>>
> >>>>            
> >>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
> >>>> b
> >>>> c
> >>>> :postgresql://localhost:5432/beam")
> >>>>
> >>>>                             
> >>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
> >>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
> >>>>
> >>>>                                
> >>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
> >>>> r
> >>>> i
> >>>> ng>()_ {
> >>>>
> >>>> *public**void*setParameters(String element, PreparedStatement query)
> >>>> *throws*SQLException {
> >>>>
> >>>>                                             String[] datas=
> >>>> element.split("\t");
> >>>>
> >>>> *if*(datas.length>0) {
> >>>>
> >>>> *for*(*int*j=0 ; j<datas.length;j++){
> >>>>
> >>>> query.setString(j+1, datas[j]);
> >>>>
> >>>>                                                    }
> >>>>
> >>>>                                             }
> >>>>
> >>>> }
> >>>>
> >>>>                    }));
> >>>>
> >>>>                  SparkRunner runner= SparkRunner./create/(options);
> >>>>
> >>>> runner.run(p).waitUntilFinish();
> >>>>
> >>>> Any help would be greatly appreciated.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Mahender
> >>>>
> >>>
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbonofre@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbonofre@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
> > 
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> > 
> 
> -- 
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> Hi JB,
I have tried this PR and still I see the same error i.e Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy
Does the change from Iterables to iterators is supposed to fix this issue or missing something else . ? 
please confirm.

Thanks in Advance.
Suri


Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi guys

I created PR #3808:

https://github.com/apache/beam/pull/3808

It introduces a new Spark 2.x specific runner (in addition of the Spark 1.x one).

I have some questions (more for dev) that I let in a comment.

It's still a work in progress as I have to fix unit tests and discuss about the 
validate runner test. However you can already take a look.

Regards
JB

On 09/04/2017 04:00 PM, Mahender Devaruppala wrote:
> Sure, thanks very much JB, will look forward to your link.
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> Sent: Monday, September 4, 2017 8:59 AM
> To: user@beam.apache.org
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Hi,
> 
> Actually, I'm preparing the PR. I will send the PR link to you.
> 
> Give me just some time to rebase my branch and push.
> 
> Thanks,
> Regards
> JB
> 
> On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
>> Hi JB,
>>
>> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
>>
>> Thanks,
>> Mahender
>>
>> -----Original Message-----
>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>> Sent: Friday, September 1, 2017 1:54 AM
>> To: user@beam.apache.org
>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>
>> Sure, I will send the PR during the weekend. I will let you know.
>>
>> Regards
>> JB
>>
>> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
>>>
>>> -----Original Message-----
>>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>>> Sent: Thursday, August 31, 2017 12:11 AM
>>> To: user@beam.apache.org
>>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>>
>>> Hi,
>>>
>>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>>
>>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
>>>
>>> Regards
>>> JB
>>>
>>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>>> Hello,
>>>>
>>>> I am running into spark assertion error when running a apache
>>>> pipeline and below are the details:
>>>>
>>>> Apache Beam version: 2.1.0
>>>>
>>>> Spark version: 2.1.0
>>>>
>>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
>>>> must return a zero value copy
>>>>
>>>>                    at scala.Predef$.assert(Predef.scala:179)
>>>>
>>>>                    at
>>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>>> 1
>>>> 62)
>>>>
>>>>                    at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>>
>>>>                    at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>>> Source)
>>>>
>>>>                    at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>>
>>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
>>>> compatible to work with Spark v2.1.0?
>>>>
>>>> Below is the code snippet for the pipeline:
>>>>
>>>>            PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>>
>>>>                  CSVOptions options=
>>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
>>>> n
>>>> s
>>>> .*class*);
>>>>
>>>> options.setRunner(SparkRunner.*class*);
>>>>
>>>> options.setSparkMaster("local[4]");
>>>>
>>>> options.setEnableSparkMetricSinks(*false*);
>>>>
>>>>                  Pipeline p= Pipeline./create/(options);
>>>>
>>>> p.apply("ReadMyCSVFile",
>>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>>
>>>>                  .apply(*new*DataLoader())
>>>>
>>>>                  
>>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>>
>>>>            
>>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
>>>> b
>>>> c
>>>> :postgresql://localhost:5432/beam")
>>>>
>>>>                             
>>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
>>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>>
>>>>                                
>>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
>>>> r
>>>> i
>>>> ng>()_ {
>>>>
>>>> *public**void*setParameters(String element, PreparedStatement query)
>>>> *throws*SQLException {
>>>>
>>>>                                             String[] datas=
>>>> element.split("\t");
>>>>
>>>> *if*(datas.length>0) {
>>>>
>>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>>
>>>> query.setString(j+1, datas[j]);
>>>>
>>>>                                                    }
>>>>
>>>>                                             }
>>>>
>>>> }
>>>>
>>>>                    }));
>>>>
>>>>                  SparkRunner runner= SparkRunner./create/(options);
>>>>
>>>> runner.run(p).waitUntilFinish();
>>>>
>>>> Any help would be greatly appreciated.
>>>>
>>>> Thanks,
>>>>
>>>> Mahender
>>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbonofre@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
> 
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

RE: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Mahender Devaruppala <ma...@apporchid.com>.
Sure, thanks very much JB, will look forward to your link. 

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net] 
Sent: Monday, September 4, 2017 8:59 AM
To: user@beam.apache.org
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Hi,

Actually, I'm preparing the PR. I will send the PR link to you.

Give me just some time to rebase my branch and push.

Thanks,
Regards
JB

On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
> Hi JB,
> 
> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
> 
> Thanks,
> Mahender
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> Sent: Friday, September 1, 2017 1:54 AM
> To: user@beam.apache.org
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Sure, I will send the PR during the weekend. I will let you know.
> 
> Regards
> JB
> 
> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
>>
>> -----Original Message-----
>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>> Sent: Thursday, August 31, 2017 12:11 AM
>> To: user@beam.apache.org
>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>
>> Hi,
>>
>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>
>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
>>
>> Regards
>> JB
>>
>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>> Hello,
>>>
>>> I am running into spark assertion error when running a apache 
>>> pipeline and below are the details:
>>>
>>> Apache Beam version: 2.1.0
>>>
>>> Spark version: 2.1.0
>>>
>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset 
>>> must return a zero value copy
>>>
>>>                   at scala.Predef$.assert(Predef.scala:179)
>>>
>>>                   at
>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>> 1
>>> 62)
>>>
>>>                   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>>
>>>                   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>> Source)
>>>
>>>                   at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>
>>> Can you please let me know if Apache beam v2.1.0 Spark runner is 
>>> compatible to work with Spark v2.1.0?
>>>
>>> Below is the code snippet for the pipeline:
>>>
>>>           PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>
>>>                 CSVOptions options=
>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptio
>>> n
>>> s
>>> .*class*);
>>>
>>> options.setRunner(SparkRunner.*class*);
>>>
>>> options.setSparkMaster("local[4]");
>>>
>>> options.setEnableSparkMetricSinks(*false*);
>>>
>>>                 Pipeline p= Pipeline./create/(options);
>>>
>>> p.apply("ReadMyCSVFile",
>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>
>>>                 .apply(*new*DataLoader())
>>>
>>>                 
>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>
>>>           
>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jd
>>> b
>>> c
>>> :postgresql://localhost:5432/beam")
>>>
>>>                            
>>> .withUsername("postgres").withPassword("postgres")).withStatement("i
>>> n s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>
>>>                               
>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<St
>>> r
>>> i
>>> ng>()_ {
>>>
>>> *public**void*setParameters(String element, PreparedStatement query) 
>>> *throws*SQLException {
>>>
>>>                                            String[] datas= 
>>> element.split("\t");
>>>
>>> *if*(datas.length>0) {
>>>
>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>
>>> query.setString(j+1, datas[j]);
>>>
>>>                                                   }
>>>
>>>                                            }
>>>
>>> }
>>>
>>>                   }));
>>>
>>>                 SparkRunner runner= SparkRunner./create/(options);
>>>
>>> runner.run(p).waitUntilFinish();
>>>
>>> Any help would be greatly appreciated.
>>>
>>> Thanks,
>>>
>>> Mahender
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
> 
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi,

Actually, I'm preparing the PR. I will send the PR link to you.

Give me just some time to rebase my branch and push.

Thanks,
Regards
JB

On 09/04/2017 03:15 PM, Mahender Devaruppala wrote:
> Hi JB,
> 
> If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?
> 
> Thanks,
> Mahender
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> Sent: Friday, September 1, 2017 1:54 AM
> To: user@beam.apache.org
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Sure, I will send the PR during the weekend. I will let you know.
> 
> Regards
> JB
> 
> On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
>> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
>>
>> -----Original Message-----
>> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
>> Sent: Thursday, August 31, 2017 12:11 AM
>> To: user@beam.apache.org
>> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
>>
>> Hi,
>>
>> I'm working on a Spark runner specific to Spark 2.x as the API changed.
>>
>> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
>>
>> Regards
>> JB
>>
>> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>>> Hello,
>>>
>>> I am running into spark assertion error when running a apache
>>> pipeline and below are the details:
>>>
>>> Apache Beam version: 2.1.0
>>>
>>> Spark version: 2.1.0
>>>
>>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
>>> must return a zero value copy
>>>
>>>                   at scala.Predef$.assert(Predef.scala:179)
>>>
>>>                   at
>>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>>> 1
>>> 62)
>>>
>>>                   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>>
>>>                   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>>> Source)
>>>
>>>                   at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>>
>>> Can you please let me know if Apache beam v2.1.0 Spark runner is
>>> compatible to work with Spark v2.1.0?
>>>
>>> Below is the code snippet for the pipeline:
>>>
>>>           PipelineOptionsFactory./register/(CSVOptions.*class*);
>>>
>>>                 CSVOptions options=
>>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOption
>>> s
>>> .*class*);
>>>
>>> options.setRunner(SparkRunner.*class*);
>>>
>>> options.setSparkMaster("local[4]");
>>>
>>> options.setEnableSparkMetricSinks(*false*);
>>>
>>>                 Pipeline p= Pipeline./create/(options);
>>>
>>> p.apply("ReadMyCSVFile",
>>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>>
>>>                 .apply(*new*DataLoader())
>>>
>>>                 
>>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>>
>>>           
>>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdb
>>> c
>>> :postgresql://localhost:5432/beam")
>>>
>>>                            
>>> .withUsername("postgres").withPassword("postgres")).withStatement("in
>>> s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>>
>>>                               
>>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Str
>>> i
>>> ng>()_ {
>>>
>>> *public**void*setParameters(String element, PreparedStatement query)
>>> *throws*SQLException {
>>>
>>>                                            String[] datas=
>>> element.split("\t");
>>>
>>> *if*(datas.length>0) {
>>>
>>> *for*(*int*j=0 ; j<datas.length;j++){
>>>
>>> query.setString(j+1, datas[j]);
>>>
>>>                                                   }
>>>
>>>                                            }
>>>
>>> }
>>>
>>>                   }));
>>>
>>>                 SparkRunner runner= SparkRunner./create/(options);
>>>
>>> runner.run(p).waitUntilFinish();
>>>
>>> Any help would be greatly appreciated.
>>>
>>> Thanks,
>>>
>>> Mahender
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
> 
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

RE: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Mahender Devaruppala <ma...@apporchid.com>.
Hi JB,

If possible, could you please send me the code/location to download Spark Runner for Spark 2.x?

Thanks,
Mahender 

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net] 
Sent: Friday, September 1, 2017 1:54 AM
To: user@beam.apache.org
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Sure, I will send the PR during the weekend. I will let you know.

Regards
JB

On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> Sent: Thursday, August 31, 2017 12:11 AM
> To: user@beam.apache.org
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Hi,
> 
> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> 
> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
> 
> Regards
> JB
> 
> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>> Hello,
>>
>> I am running into spark assertion error when running a apache 
>> pipeline and below are the details:
>>
>> Apache Beam version: 2.1.0
>>
>> Spark version: 2.1.0
>>
>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset 
>> must return a zero value copy
>>
>>                  at scala.Predef$.assert(Predef.scala:179)
>>
>>                  at
>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:
>> 1
>> 62)
>>
>>                  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>>
>>                  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>>
>>                  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>
>> Can you please let me know if Apache beam v2.1.0 Spark runner is 
>> compatible to work with Spark v2.1.0?
>>
>> Below is the code snippet for the pipeline:
>>
>>          PipelineOptionsFactory./register/(CSVOptions.*class*);
>>
>>                CSVOptions options=
>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOption
>> s
>> .*class*);
>>
>> options.setRunner(SparkRunner.*class*);
>>
>> options.setSparkMaster("local[4]");
>>
>> options.setEnableSparkMetricSinks(*false*);
>>
>>                Pipeline p= Pipeline./create/(options);
>>
>> p.apply("ReadMyCSVFile",
>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>
>>                .apply(*new*DataLoader())
>>
>>                
>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>
>>          
>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdb
>> c
>> :postgresql://localhost:5432/beam")
>>
>>                           
>> .withUsername("postgres").withPassword("postgres")).withStatement("in
>> s ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>
>>                              
>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Str
>> i
>> ng>()_ {
>>
>> *public**void*setParameters(String element, PreparedStatement query) 
>> *throws*SQLException {
>>
>>                                           String[] datas= 
>> element.split("\t");
>>
>> *if*(datas.length>0) {
>>
>> *for*(*int*j=0 ; j<datas.length;j++){
>>
>> query.setString(j+1, datas[j]);
>>
>>                                                  }
>>
>>                                           }
>>
>> }
>>
>>                  }));
>>
>>                SparkRunner runner= SparkRunner./create/(options);
>>
>> runner.run(p).waitUntilFinish();
>>
>> Any help would be greatly appreciated.
>>
>> Thanks,
>>
>> Mahender
>>
> 
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Sure, I will send the PR during the weekend. I will let you know.

Regards
JB

On 08/31/2017 03:31 PM, Mahender Devaruppala wrote:
> Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?
> 
> -----Original Message-----
> From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net]
> Sent: Thursday, August 31, 2017 12:11 AM
> To: user@beam.apache.org
> Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue
> 
> Hi,
> 
> I'm working on a Spark runner specific to Spark 2.x as the API changed.
> 
> So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.
> 
> Regards
> JB
> 
> On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
>> Hello,
>>
>> I am running into spark assertion error when running a apache pipeline
>> and below are the details:
>>
>> Apache Beam version: 2.1.0
>>
>> Spark version: 2.1.0
>>
>> Caused by: java.lang.AssertionError: assertion failed: copyAndReset
>> must return a zero value copy
>>
>>                  at scala.Predef$.assert(Predef.scala:179)
>>
>>                  at
>> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:1
>> 62)
>>
>>                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>>
>>                  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>>
>>                  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>>
>> Can you please let me know if Apache beam v2.1.0 Spark runner is
>> compatible to work with Spark v2.1.0?
>>
>> Below is the code snippet for the pipeline:
>>
>>          PipelineOptionsFactory./register/(CSVOptions.*class*);
>>
>>                CSVOptions options=
>> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptions
>> .*class*);
>>
>> options.setRunner(SparkRunner.*class*);
>>
>> options.setSparkMaster("local[4]");
>>
>> options.setEnableSparkMetricSinks(*false*);
>>
>>                Pipeline p= Pipeline./create/(options);
>>
>> p.apply("ReadMyCSVFile",
>> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
>>
>>                .apply(*new*DataLoader())
>>
>>                
>> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
>>
>>          
>> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdbc
>> :postgresql://localhost:5432/beam")
>>
>>                           
>> .withUsername("postgres").withPassword("postgres")).withStatement("ins
>> ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
>>
>>                              
>> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Stri
>> ng>()_ {
>>
>> *public**void*setParameters(String element, PreparedStatement query)
>> *throws*SQLException {
>>
>>                                           String[] datas=
>> element.split("\t");
>>
>> *if*(datas.length>0) {
>>
>> *for*(*int*j=0 ; j<datas.length;j++){
>>
>> query.setString(j+1, datas[j]);
>>
>>                                                  }
>>
>>                                           }
>>
>> }
>>
>>                  }));
>>
>>                SparkRunner runner= SparkRunner./create/(options);
>>
>> runner.run(p).waitUntilFinish();
>>
>> Any help would be greatly appreciated.
>>
>> Thanks,
>>
>> Mahender
>>
> 
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

RE: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Mahender Devaruppala <ma...@apporchid.com>.
Thanks JB.  Could you please point me to the location of Spark Runner specific to Spark 2.x or is this something part any configurations?

-----Original Message-----
From: Jean-Baptiste Onofré [mailto:jb@nanthrax.net] 
Sent: Thursday, August 31, 2017 12:11 AM
To: user@beam.apache.org
Subject: Re: Apache Beam v2.1.0 - Spark Runner Issue

Hi,

I'm working on a Spark runner specific to Spark 2.x as the API changed.

So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.

Regards
JB

On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
> Hello,
> 
> I am running into spark assertion error when running a apache pipeline 
> and below are the details:
> 
> Apache Beam version: 2.1.0
> 
> Spark version: 2.1.0
> 
> Caused by: java.lang.AssertionError: assertion failed: copyAndReset 
> must return a zero value copy
> 
>                 at scala.Predef$.assert(Predef.scala:179)
> 
>                 at
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:1
> 62)
> 
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> 
>                 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source)
> 
>                 at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> 
> Can you please let me know if Apache beam v2.1.0 Spark runner is 
> compatible to work with Spark v2.1.0?
> 
> Below is the code snippet for the pipeline:
> 
>         PipelineOptionsFactory./register/(CSVOptions.*class*);
> 
>               CSVOptions options=
> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptions
> .*class*);
> 
> options.setRunner(SparkRunner.*class*);
> 
> options.setSparkMaster("local[4]");
> 
> options.setEnableSparkMetricSinks(*false*);
> 
>               Pipeline p= Pipeline./create/(options);
> 
> p.apply("ReadMyCSVFile",
> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
> 
>               .apply(*new*DataLoader())
> 
>               
> .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
> 
>         
> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdbc
> :postgresql://localhost:5432/beam")
> 
>                          
> .withUsername("postgres").withPassword("postgres")).withStatement("ins
> ert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
> 
>                             
> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<Stri
> ng>()_ {
> 
> *public**void*setParameters(String element, PreparedStatement query) 
> *throws*SQLException {
> 
>                                          String[] datas= 
> element.split("\t");
> 
> *if*(datas.length>0) {
> 
> *for*(*int*j=0 ; j<datas.length;j++){
> 
> query.setString(j+1, datas[j]);
> 
>                                                 }
> 
>                                          }
> 
> }
> 
>                 }));
> 
>               SparkRunner runner= SparkRunner./create/(options);
> 
> runner.run(p).waitUntilFinish();
> 
> Any help would be greatly appreciated.
> 
> Thanks,
> 
> Mahender
> 

--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi,

I'm working on a Spark runner specific to Spark 2.x as the API changed.

So, for now, I have two runners: one for Spark 1.x and one for Spark 2.x.

Regards
JB

On 08/30/2017 11:45 PM, Mahender Devaruppala wrote:
> Hello,
> 
> I am running into spark assertion error when running a apache pipeline and below 
> are the details:
> 
> Apache Beam version: 2.1.0
> 
> Spark version: 2.1.0
> 
> Caused by: java.lang.AssertionError: assertion failed: copyAndReset must return 
> a zero value copy
> 
>                 at scala.Predef$.assert(Predef.scala:179)
> 
>                 at 
> org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)
> 
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>                 at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> 
>                 at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> 
> Can you please let me know if Apache beam v2.1.0 Spark runner is compatible to 
> work with Spark v2.1.0?
> 
> Below is the code snippet for the pipeline:
> 
>         PipelineOptionsFactory./register/(CSVOptions.*class*);
> 
>               CSVOptions options= 
> PipelineOptionsFactory./fromArgs/(args).withValidation().as(CSVOptions.*class*);
> 
> options.setRunner(SparkRunner.*class*);
> 
> options.setSparkMaster("local[4]");
> 
> options.setEnableSparkMetricSinks(*false*);
> 
>               Pipeline p= Pipeline./create/(options);
> 
> p.apply("ReadMyCSVFile", 
> TextIO./read/().from(URIUtil./getFromPath/(options.getInputFile())))
> 
>               .apply(*new*DataLoader())
> 
>               .apply(JdbcIO.<String>/write/().withDataSourceConfiguration
> 
>         
> (JdbcIO.DataSourceConfiguration./create/("org.postgresql.Driver","jdbc:postgresql://localhost:5432/beam")
> 
>                          
> .withUsername("postgres").withPassword("postgres")).withStatement("insert into 
> test_table values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
> 
>                             
> .withPreparedStatementSetter(*new*_JdbcIO.PreparedStatementSetter<String>()_ {
> 
> *public**void*setParameters(String element, PreparedStatement query) 
> *throws*SQLException {
> 
>                                          String[] datas= element.split("\t");
> 
> *if*(datas.length>0) {
> 
> *for*(*int*j=0 ; j<datas.length;j++){
> 
> query.setString(j+1, datas[j]);
> 
>                                                 }
> 
>                                          }
> 
> }
> 
>                 }));
> 
>               SparkRunner runner= SparkRunner./create/(options);
> 
> runner.run(p).waitUntilFinish();
> 
> Any help would be greatly appreciated.
> 
> Thanks,
> 
> Mahender
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Apache Beam v2.1.0 - Spark Runner Issue

Posted by Lukasz Cwik <lc...@google.com>.
To my knowledge you should use Spark 1.6.3 since that is what is declared
as the spark.version in the projects root pom.xml

On Wed, Aug 30, 2017 at 2:45 PM, Mahender Devaruppala <
mahenderd@apporchid.com> wrote:

> Hello,
>
>
>
> I am running into spark assertion error when running a apache pipeline and
> below are the details:
>
>
>
> Apache Beam version: 2.1.0
>
> Spark version: 2.1.0
>
>
>
> Caused by: java.lang.AssertionError: assertion failed: copyAndReset must
> return a zero value copy
>
>                at scala.Predef$.assert(Predef.scala:179)
>
>                at org.apache.spark.util.AccumulatorV2.writeReplace(
> AccumulatorV2.scala:162)
>
>                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>
>                at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>
>
>
> Can you please let me know if Apache beam v2.1.0 Spark runner is
> compatible to work with Spark v2.1.0?
>
>
>
> Below is the code snippet for the pipeline:
>
>
>
>        PipelineOptionsFactory.*register*(CSVOptions.*class*);
>
>              CSVOptions options = PipelineOptionsFactory.*fromArgs*(args
> ).withValidation().as(CSVOptions.*class*);
>
>              options.setRunner(SparkRunner.*class*);
>
>              options.setSparkMaster("local[4]");
>
>              options.setEnableSparkMetricSinks(*false*);
>
>              Pipeline p = Pipeline.*create*(options);
>
>              p.apply("ReadMyCSVFile", TextIO.*read*().from(URIUtil.
> *getFromPath*(options.getInputFile())))
>
>              .apply(*new* DataLoader())
>
>              .apply(JdbcIO.<String>*write*().withDataSourceConfiguration
>
>        (JdbcIO.DataSourceConfiguration.*create*("org.postgresql.Driver","
> jdbc:postgresql://localhost:5432/beam")
>
>                         .withUsername("postgres").withPassword("postgres"
> )).withStatement("insert into test_table values(?,?,?,?,?,?,?,?,?,?,?,?
> ,?,?,?,?)")
>
>                            .withPreparedStatementSetter(*new*
> *JdbcIO.PreparedStatementSetter<String>()* {
>
>                                  *public* *void* setParameters(String
> element, PreparedStatement query) *throws* SQLException {
>
>                                         String[] datas = element.split(
> "\t");
>
>                                         *if*(datas.length >0) {
>
>                                                *for*(*int* j=0 ; j<datas.
> length;j++){
>
>                                                      query.setString(j+1,
> datas[j]);
>
>                                                }
>
>                                         }
>
>
>
>                                  }
>
>                }));
>
>              SparkRunner runner = SparkRunner.*create*(options);
>
>              runner.run(p).waitUntilFinish();
>
>
>
>
>
> Any help would be greatly appreciated.
>
>
>
> Thanks,
>
> Mahender
>
>
>