You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Manohar753 <ma...@happiestminds.com> on 2015/07/15 11:13:15 UTC

DataFrame InsertIntoJdbc() Runtime Exception on cluster

Hi All,

Am trying to add few new rows for existing table in mysql using
DataFrame.But it is adding new rows to the table in local environment but on
spark cluster below is the runtime exception.


Exception in thread "main" java.lang.RuntimeException: Table msusers_1
already exists.
        at scala.sys.package$.error(package.scala:27)
        at
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:240)
        at
org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1481)
        at com.sparkexpert.UserMigration.main(UserMigration.java:59)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
        at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
        at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/15 08:13:42 INFO spark.SparkContext: Invoking stop() from shutdown
hook
15/07/15 08:13:42 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/07/15 08:13:

code snippet is below:

System.out.println(Query);
        Map<String, String> options = new HashMap<>();
        options.put("driver",
PropertyLoader.getProperty(Constants.msSqlDriver));
        options.put("url", PropertyLoader.getProperty(Constants.msSqlURL));
        options.put("dbtable",Query);       
        options.put("numPartitions", "1");
        DataFrame delatUsers = sqlContext.load("jdbc", options);
        
        
        delatUsers.show();
        //Load latest users DataFrame

        String mysQuery="(SELECT * FROM msusers_1) as employees_name";
        Map<String, String> msoptions = new HashMap<>();
       
msoptions.put("driver",PropertyLoader.getProperty(Constants.mysqlDriver));
        msoptions.put("url",
PropertyLoader.getProperty(Constants.mysqlUrl));
        msoptions.put("dbtable",mysQuery);       
        msoptions.put("numPartitions", "1");
        DataFrame latestUsers = sqlContext.load("jdbc", msoptions); 
        
//Get Update users Data
        DataFrame updatedUsers =   
delatUsers.as("ms").join(latestUsers.as("lat"),
col("lat.uid").equalTo(col("ms.uid")),
"inner").select("ms.revision","ms.uid","ms.UserType","ms.FirstName","ms.LastName","ms.Email","ms.smsuser_id","ms.dev_acct","ms.lastlogin","ms.username","ms.schoolAffiliation","ms.authsystem_id","ms.AdminStatus");
 //Insert new users into Mysql DB
*       
delatUsers.except(updatedUsers).insertIntoJDBC(PropertyLoader.getProperty(Constants.mysqlUrl),
"msusers_1", false);
*
 the bold line is the Exception occur line.
Team please give me some inputs if any one had come across this .
but for the same override the table is working fine on cluster also.

Thanks,
manoar



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-InsertIntoJdbc-Runtime-Exception-on-cluster-tp23851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: DataFrame InsertIntoJdbc() Runtime Exception on cluster

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Which version of spark are you using? insertIntoJDBC is deprecated (from
1.4.0), you may use write.jdbc() instead.

Thanks
Best Regards

On Wed, Jul 15, 2015 at 2:43 PM, Manohar753 <manohar.reddy@happiestminds.com
> wrote:

> Hi All,
>
> Am trying to add few new rows for existing table in mysql using
> DataFrame.But it is adding new rows to the table in local environment but
> on
> spark cluster below is the runtime exception.
>
>
> Exception in thread "main" java.lang.RuntimeException: Table msusers_1
> already exists.
>         at scala.sys.package$.error(package.scala:27)
>         at
> org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:240)
>         at
> org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1481)
>         at com.sparkexpert.UserMigration.main(UserMigration.java:59)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 15/07/15 08:13:42 INFO spark.SparkContext: Invoking stop() from shutdown
> hook
> 15/07/15 08:13:42 INFO handler.ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
> 15/07/15 08:13:
>
> code snippet is below:
>
> System.out.println(Query);
>         Map<String, String> options = new HashMap<>();
>         options.put("driver",
> PropertyLoader.getProperty(Constants.msSqlDriver));
>         options.put("url", PropertyLoader.getProperty(Constants.msSqlURL));
>         options.put("dbtable",Query);
>         options.put("numPartitions", "1");
>         DataFrame delatUsers = sqlContext.load("jdbc", options);
>
>
>         delatUsers.show();
>         //Load latest users DataFrame
>
>         String mysQuery="(SELECT * FROM msusers_1) as employees_name";
>         Map<String, String> msoptions = new HashMap<>();
>
> msoptions.put("driver",PropertyLoader.getProperty(Constants.mysqlDriver));
>         msoptions.put("url",
> PropertyLoader.getProperty(Constants.mysqlUrl));
>         msoptions.put("dbtable",mysQuery);
>         msoptions.put("numPartitions", "1");
>         DataFrame latestUsers = sqlContext.load("jdbc", msoptions);
>
> //Get Update users Data
>         DataFrame updatedUsers =
> delatUsers.as("ms").join(latestUsers.as("lat"),
> col("lat.uid").equalTo(col("ms.uid")),
>
> "inner").select("ms.revision","ms.uid","ms.UserType","ms.FirstName","ms.LastName","ms.Email","ms.smsuser_id","ms.dev_acct","ms.lastlogin","ms.username","ms.schoolAffiliation","ms.authsystem_id","ms.AdminStatus");
>  //Insert new users into Mysql DB
> *
>
> delatUsers.except(updatedUsers).insertIntoJDBC(PropertyLoader.getProperty(Constants.mysqlUrl),
> "msusers_1", false);
> *
>  the bold line is the Exception occur line.
> Team please give me some inputs if any one had come across this .
> but for the same override the table is working fine on cluster also.
>
> Thanks,
> manoar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-InsertIntoJdbc-Runtime-Exception-on-cluster-tp23851.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>