You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by akshayhazari <ak...@gmail.com> on 2014/11/11 16:24:18 UTC
Combining data from two tables in two databases postgresql,
JdbcRDD.
I want to be able to perform a query on two tables in different databases. I
want to know whether it can be done. I've heard about union of two RDD's but
here I want to connect to something like different partitions of a table.
Any help is appreciated
import java.io.Serializable;
//import org.junit.*;
//import static org.junit.Assert.*;
import scala.*;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.*;
import scala.collection.mutable.LinkedHashMap;
//import static scala.collection.Map.Projection;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.*;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.StructType;
import org.apache.spark.sql.api.java.StructField;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import java.sql.*;
import java.util.*;
import com.mysql.jdbc.Driver;
import com.mysql.jdbc.*;
import java.io.*;
public class Spark_Mysql {
static class Z extends AbstractFunction0<java.sql.Connection> implements
Serializable
{
java.sql.Connection con;
public java.sql.Connection apply()
{
try {
con=DriverManager.getConnection("jdbc:mysql://localhost:3306/azkaban?user=azkaban&password=password");
}
catch(Exception e)
{
e.printStackTrace();
}
return con;
}
}
static public class Z1 extends AbstractFunction1<ResultSet,Integer>
implements Serializable
{
int ret;
public Integer apply(ResultSet i) {
try{
ret=i.getInt(1);
}
catch(Exception e)
{e.printStackTrace();}
return ret;
}
}
public static void main(String[] args) throws Exception {
String arr[]=new String[1];
arr[0]="/home/hduser/Documents/Credentials/Newest_Credentials_AX/spark-1.1.0-bin-hadoop1/lib/mysql-connector-java-5.1.33-bin.jar";
JavaSparkContext ctx = new JavaSparkContext(new
SparkConf().setAppName("JavaSparkSQL").setJars(arr));
SparkContext sctx = new SparkContext(new
SparkConf().setAppName("JavaSparkSQL").setJars(arr));
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
try
{
Class.forName("com.mysql.jdbc.Driver");
}
catch(Exception ex)
{
ex.printStackTrace();
System.exit(1);
}
JdbcRDD rdd=new JdbcRDD(sctx,new Z(),"SELECT * FROM spark WHERE ? <= id
AND id <= ?",0L, 1000L, 10,new
Z1(),scala.reflect.ClassTag$.MODULE$.AnyRef());
rdd.saveAsTextFile("hdfs://127.0.0.1:9000/user/hduser/mysqlrdd");
rdd.saveAsTextFile("/home/hduser/mysqlrdd");
}
}
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Combining-data-from-two-tables-in-two-databases-postgresql-JdbcRDD-tp18597.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org