You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sarath Chandra <sa...@algofusiontech.com> on 2014/07/16 15:21:07 UTC

Simple record matching using Spark SQL

Hi All,

I'm trying to do a simple record matching between 2 files and wrote
following code -

*import org.apache.spark.sql.SQLContext;*
*import org.apache.spark.rdd.RDD*
*object SqlTest {*
*  case class Test(fld1:String, fld2:String, fld3:String, fld4:String,
fld4:String, fld5:Double, fld6:String);*
*  sc.addJar("test1-0.1.jar");*
*  val file1 = sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
*  val file2 = sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
*  val sq = new SQLContext(sc);*
*  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => Test(l(0),
l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
*  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => Test(s(0),
s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
*  val file1_schema = sq.createSchemaRDD(file1_recs);*
*  val file2_schema = sq.createSchemaRDD(file2_recs);*
*  file1_schema.registerAsTable("file1_tab");*
*  file2_schema.registerAsTable("file2_tab");*
*  val matched = sq.sql("select * from file1_tab l join file2_tab s on
l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and
l.fld2=s.fld2");*
*  val count = matched.count();*
*  System.out.println("Found " + matched.count() + " matching records");*
*}*

When I run this program on a standalone spark cluster, it keeps running for
long with no output or error. After waiting for few mins I'm forcibly
killing it.
But the same program is working well when executed from a spark shell.

What is going wrong? What am I missing?

~Sarath

Re: Simple record matching using Spark SQL

Posted by Yin Huai <yh...@databricks.com>.
Hi Sarath,

Have you tried the current branch 1.0? If not, can you give it a try and
see if the problem can be resolved?

Thanks,

Yin


On Thu, Jul 24, 2014 at 11:17 AM, Yin Huai <yh...@databricks.com> wrote:

> Hi Sarath,
>
> I will try to reproduce the problem.
>
> Thanks,
>
> Yin
>
>
> On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
>> Hi Michael,
>>
>> Sorry for the delayed response.
>>
>> I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark
>> programs on a standalone spark cluster using 2 nodes. One node works as
>> both master and worker while other node is just a worker.
>>
>> I quite didn't get when you asked for "jstack of the driver and
>> executor". So I'm attaching the log files generated in $SPARK_HOME/logs and
>> stdout and stderr files for this job in $SPARK_HOME/work folder from both
>> the nodes.
>>
>> Also attaching the program which I executed. If I uncomment the lines 36
>> & 37 the program works fine, otherwise it just keeps running forever.
>>
>> ~Sarath.
>>
>>
>> On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust <michael@databricks.com
>> > wrote:
>>
>>> What version are you running?  Could you provide a jstack of the driver
>>> and executor when it is hanging?
>>>
>>>
>>> On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>>> Added below 2 lines just before the sql query line -
>>>> *...*
>>>> *file1_schema.count;*
>>>> *file2_schema.count;*
>>>> *...*
>>>> and it started working. But I couldn't get the reason.
>>>>
>>>> Can someone please explain me? What was happening earlier and what is
>>>> happening with addition of these 2 lines?
>>>>
>>>> ~Sarath
>>>>
>>>>
>>>> On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra <
>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>
>>>>> No Sonal, I'm not doing any explicit call to stop context.
>>>>>
>>>>> If you see my previous post to Michael, the commented portion of the
>>>>> code is my requirement. When I run this over standalone spark cluster, the
>>>>> execution keeps running with no output or error. After waiting for several
>>>>> minutes I'm killing it by pressing Ctrl+C in the terminal.
>>>>>
>>>>> But the same code runs perfectly when executed from spark shell.
>>>>>
>>>>> ~Sarath
>>>>>
>>>>>
>>>>> On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal <so...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Sarath,
>>>>>>
>>>>>> Are you explicitly stopping the context?
>>>>>>
>>>>>> sc.stop()
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best Regards,
>>>>>> Sonal
>>>>>> Nube Technologies <http://www.nubetech.co>
>>>>>>
>>>>>> <http://in.linkedin.com/in/sonalgoyal>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>
>>>>>>> Hi Michael, Soumya,
>>>>>>>
>>>>>>> Can you please check and let me know what is the issue? what am I
>>>>>>> missing?
>>>>>>> Let me know if you need any logs to analyze.
>>>>>>>
>>>>>>> ~Sarath
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>
>>>>>>>> Hi Michael,
>>>>>>>>
>>>>>>>> Tried it. It's correctly printing the line counts of both the
>>>>>>>> files. Here's what I tried -
>>>>>>>>
>>>>>>>> *Code:*
>>>>>>>> *package test*
>>>>>>>> *object Test4 {*
>>>>>>>> *  case class Test(fld1: String, *
>>>>>>>> *   fld2: String, *
>>>>>>>> *   fld3: String, *
>>>>>>>> *   fld4: String, *
>>>>>>>> *   fld5: String, *
>>>>>>>> *   fld6: Double, *
>>>>>>>> *   fld7: String);*
>>>>>>>> *  def main(args: Array[String]) {*
>>>>>>>> *    val conf = new SparkConf()*
>>>>>>>> *    .setMaster(args(0))*
>>>>>>>> * .setAppName("SQLTest")*
>>>>>>>> * .setSparkHome(args(1))*
>>>>>>>> * .set("spark.executor.memory", "2g");*
>>>>>>>> *    val sc = new SparkContext(conf);*
>>>>>>>> *    sc.addJar("test1-0.1.jar");*
>>>>>>>> *    val file1 = sc.textFile(args(2));*
>>>>>>>> *    println(file1.count());*
>>>>>>>> *    val file2 = sc.textFile(args(3));*
>>>>>>>> *    println(file2.count());*
>>>>>>>> *//    val sq = new SQLContext(sc);*
>>>>>>>> *//    import sq._*
>>>>>>>> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>> *//    file1_schema.registerAsTable("file1_tab");*
>>>>>>>> *//    file2_schema.registerAsTable("file2_tab");*
>>>>>>>> *//    val matched = sq.sql("select * from file1_tab l join
>>>>>>>> file2_tab s on " + *
>>>>>>>> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>>>>>>>> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>>>>>>>> *//     "l.fld6=s.fld6");*
>>>>>>>> *//    matched.collect().foreach(println);*
>>>>>>>> *  }*
>>>>>>>> *}*
>>>>>>>>
>>>>>>>> *Execution:*
>>>>>>>> *export
>>>>>>>> CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>>>>>>>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>>>>>>>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>>>>>>>> "/usr/local/spark-1.0.1-bin-hadoop1"
>>>>>>>> hdfs://master:54310/user/hduser/file1.csv
>>>>>>>> hdfs://master:54310/user/hduser/file2.csv*
>>>>>>>>
>>>>>>>> ~Sarath
>>>>>>>>
>>>>>>>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <
>>>>>>>> michael@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> What if you just run something like:
>>>>>>>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>
>>>>>>>>>> Yes Soumya, I did it.
>>>>>>>>>>
>>>>>>>>>> First I tried with the example available in the documentation
>>>>>>>>>> (example using people table and finding teenagers). After successfully
>>>>>>>>>> running it, I moved on to this one which is starting point to a bigger
>>>>>>>>>> requirement for which I'm evaluating Spark SQL.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Can you try submitting a very simple job to the cluster.
>>>>>>>>>>>
>>>>>>>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Yes it is appearing on the Spark UI, and remains there with
>>>>>>>>>>> state as "RUNNING" till I press Ctrl+C in the terminal to kill the
>>>>>>>>>>> execution.
>>>>>>>>>>>
>>>>>>>>>>> Barring the statements to create the spark context, if I copy
>>>>>>>>>>> paste the lines of my code in spark shell, runs perfectly giving the
>>>>>>>>>>> desired output.
>>>>>>>>>>>
>>>>>>>>>>> ~Sarath
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> When you submit your job, it should appear on the Spark UI.
>>>>>>>>>>>> Same with the REPL. Make sure you job is submitted to the cluster properly.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Soumya,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Data is very small, 500+ lines in each file.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Removed last 2 lines and placed this at the end
>>>>>>>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>>>>>>>>> 5min, the execution is still running.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Checked logs, nothing in stdout. In stderr I don't see
>>>>>>>>>>>>> anything going wrong, all are info messages.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What else do I need check?
>>>>>>>>>>>>>
>>>>>>>>>>>>> ~Sarath
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Check your executor logs for the output or if your data is
>>>>>>>>>>>>>> not big collect it in the driver and print it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm trying to do a simple record matching between 2 files and
>>>>>>>>>>>>>> wrote following code -
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>>>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>>>>>>>>> *object SqlTest {*
>>>>>>>>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>>>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>>>>>>>>> *  val file1 =
>>>>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>>>>>>>>> *  val file2 =
>>>>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>>>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>>>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l
>>>>>>>>>>>>>> => Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s
>>>>>>>>>>>>>> => Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>>>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>>>>>>>>> *  val matched = sq.sql("select * from file1_tab l join
>>>>>>>>>>>>>> file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and
>>>>>>>>>>>>>> l.fld5=s.fld5 and l.fld2=s.fld2");*
>>>>>>>>>>>>>> *  val count = matched.count();*
>>>>>>>>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>>>>>>>>> records");*
>>>>>>>>>>>>>> *}*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When I run this program on a standalone spark cluster, it
>>>>>>>>>>>>>> keeps running for long with no output or error. After waiting for few mins
>>>>>>>>>>>>>> I'm forcibly killing it.
>>>>>>>>>>>>>> But the same program is working well when executed from a
>>>>>>>>>>>>>> spark shell.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What is going wrong? What am I missing?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ~Sarath
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Yin Huai <yh...@databricks.com>.
Hi Sarath,

I will try to reproduce the problem.

Thanks,

Yin


On Wed, Jul 23, 2014 at 11:32 PM, Sarath Chandra <
sarathchandra.josyam@algofusiontech.com> wrote:

> Hi Michael,
>
> Sorry for the delayed response.
>
> I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark
> programs on a standalone spark cluster using 2 nodes. One node works as
> both master and worker while other node is just a worker.
>
> I quite didn't get when you asked for "jstack of the driver and executor".
> So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and
> stderr files for this job in $SPARK_HOME/work folder from both the nodes.
>
> Also attaching the program which I executed. If I uncomment the lines 36 &
> 37 the program works fine, otherwise it just keeps running forever.
>
> ~Sarath.
>
>
> On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> What version are you running?  Could you provide a jstack of the driver
>> and executor when it is hanging?
>>
>>
>> On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>>> Added below 2 lines just before the sql query line -
>>> *...*
>>> *file1_schema.count;*
>>> *file2_schema.count;*
>>> *...*
>>> and it started working. But I couldn't get the reason.
>>>
>>> Can someone please explain me? What was happening earlier and what is
>>> happening with addition of these 2 lines?
>>>
>>> ~Sarath
>>>
>>>
>>> On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>>> No Sonal, I'm not doing any explicit call to stop context.
>>>>
>>>> If you see my previous post to Michael, the commented portion of the
>>>> code is my requirement. When I run this over standalone spark cluster, the
>>>> execution keeps running with no output or error. After waiting for several
>>>> minutes I'm killing it by pressing Ctrl+C in the terminal.
>>>>
>>>> But the same code runs perfectly when executed from spark shell.
>>>>
>>>> ~Sarath
>>>>
>>>>
>>>> On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal <so...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Sarath,
>>>>>
>>>>> Are you explicitly stopping the context?
>>>>>
>>>>> sc.stop()
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Best Regards,
>>>>> Sonal
>>>>> Nube Technologies <http://www.nubetech.co>
>>>>>
>>>>> <http://in.linkedin.com/in/sonalgoyal>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>
>>>>>> Hi Michael, Soumya,
>>>>>>
>>>>>> Can you please check and let me know what is the issue? what am I
>>>>>> missing?
>>>>>> Let me know if you need any logs to analyze.
>>>>>>
>>>>>> ~Sarath
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>
>>>>>>> Hi Michael,
>>>>>>>
>>>>>>> Tried it. It's correctly printing the line counts of both the files.
>>>>>>> Here's what I tried -
>>>>>>>
>>>>>>> *Code:*
>>>>>>> *package test*
>>>>>>> *object Test4 {*
>>>>>>> *  case class Test(fld1: String, *
>>>>>>> *   fld2: String, *
>>>>>>> *   fld3: String, *
>>>>>>> *   fld4: String, *
>>>>>>> *   fld5: String, *
>>>>>>> *   fld6: Double, *
>>>>>>> *   fld7: String);*
>>>>>>> *  def main(args: Array[String]) {*
>>>>>>> *    val conf = new SparkConf()*
>>>>>>> *    .setMaster(args(0))*
>>>>>>> * .setAppName("SQLTest")*
>>>>>>> * .setSparkHome(args(1))*
>>>>>>> * .set("spark.executor.memory", "2g");*
>>>>>>> *    val sc = new SparkContext(conf);*
>>>>>>> *    sc.addJar("test1-0.1.jar");*
>>>>>>> *    val file1 = sc.textFile(args(2));*
>>>>>>> *    println(file1.count());*
>>>>>>> *    val file2 = sc.textFile(args(3));*
>>>>>>> *    println(file2.count());*
>>>>>>> *//    val sq = new SQLContext(sc);*
>>>>>>> *//    import sq._*
>>>>>>> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>> *//    file1_schema.registerAsTable("file1_tab");*
>>>>>>> *//    file2_schema.registerAsTable("file2_tab");*
>>>>>>> *//    val matched = sq.sql("select * from file1_tab l join
>>>>>>> file2_tab s on " + *
>>>>>>> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>>>>>>> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>>>>>>> *//     "l.fld6=s.fld6");*
>>>>>>> *//    matched.collect().foreach(println);*
>>>>>>> *  }*
>>>>>>> *}*
>>>>>>>
>>>>>>> *Execution:*
>>>>>>> *export
>>>>>>> CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>>>>>>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>>>>>>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>>>>>>> "/usr/local/spark-1.0.1-bin-hadoop1"
>>>>>>> hdfs://master:54310/user/hduser/file1.csv
>>>>>>> hdfs://master:54310/user/hduser/file2.csv*
>>>>>>>
>>>>>>> ~Sarath
>>>>>>>
>>>>>>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <
>>>>>>> michael@databricks.com> wrote:
>>>>>>>
>>>>>>>> What if you just run something like:
>>>>>>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>
>>>>>>>>> Yes Soumya, I did it.
>>>>>>>>>
>>>>>>>>> First I tried with the example available in the documentation
>>>>>>>>> (example using people table and finding teenagers). After successfully
>>>>>>>>> running it, I moved on to this one which is starting point to a bigger
>>>>>>>>> requirement for which I'm evaluating Spark SQL.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Can you try submitting a very simple job to the cluster.
>>>>>>>>>>
>>>>>>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Yes it is appearing on the Spark UI, and remains there with state
>>>>>>>>>> as "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>>>>>>>>
>>>>>>>>>> Barring the statements to create the spark context, if I copy
>>>>>>>>>> paste the lines of my code in spark shell, runs perfectly giving the
>>>>>>>>>> desired output.
>>>>>>>>>>
>>>>>>>>>> ~Sarath
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> When you submit your job, it should appear on the Spark UI. Same
>>>>>>>>>>> with the REPL. Make sure you job is submitted to the cluster properly.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Soumya,
>>>>>>>>>>>>
>>>>>>>>>>>> Data is very small, 500+ lines in each file.
>>>>>>>>>>>>
>>>>>>>>>>>> Removed last 2 lines and placed this at the end
>>>>>>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>>>>>>>> 5min, the execution is still running.
>>>>>>>>>>>>
>>>>>>>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything
>>>>>>>>>>>> going wrong, all are info messages.
>>>>>>>>>>>>
>>>>>>>>>>>> What else do I need check?
>>>>>>>>>>>>
>>>>>>>>>>>> ~Sarath
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Check your executor logs for the output or if your data is not
>>>>>>>>>>>>> big collect it in the driver and print it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to do a simple record matching between 2 files and
>>>>>>>>>>>>> wrote following code -
>>>>>>>>>>>>>
>>>>>>>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>>>>>>>> *object SqlTest {*
>>>>>>>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>>>>>>>> *  val file1 =
>>>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>>>>>>>> *  val file2 =
>>>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l
>>>>>>>>>>>>> => Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s
>>>>>>>>>>>>> => Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>>>>>>>> *  val matched = sq.sql("select * from file1_tab l join
>>>>>>>>>>>>> file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and
>>>>>>>>>>>>> l.fld5=s.fld5 and l.fld2=s.fld2");*
>>>>>>>>>>>>> *  val count = matched.count();*
>>>>>>>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>>>>>>>> records");*
>>>>>>>>>>>>> *}*
>>>>>>>>>>>>>
>>>>>>>>>>>>> When I run this program on a standalone spark cluster, it
>>>>>>>>>>>>> keeps running for long with no output or error. After waiting for few mins
>>>>>>>>>>>>> I'm forcibly killing it.
>>>>>>>>>>>>> But the same program is working well when executed from a
>>>>>>>>>>>>> spark shell.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What is going wrong? What am I missing?
>>>>>>>>>>>>>
>>>>>>>>>>>>> ~Sarath
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
Hi Michael,

Sorry for the delayed response.

I'm using Spark 1.0.1 (pre-built version for hadoop 1). I'm running spark
programs on a standalone spark cluster using 2 nodes. One node works as
both master and worker while other node is just a worker.

I quite didn't get when you asked for "jstack of the driver and executor".
So I'm attaching the log files generated in $SPARK_HOME/logs and stdout and
stderr files for this job in $SPARK_HOME/work folder from both the nodes.

Also attaching the program which I executed. If I uncomment the lines 36 &
37 the program works fine, otherwise it just keeps running forever.

~Sarath.


On Thu, Jul 17, 2014 at 9:35 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> What version are you running?  Could you provide a jstack of the driver
> and executor when it is hanging?
>
>
> On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
>> Added below 2 lines just before the sql query line -
>> *...*
>> *file1_schema.count;*
>> *file2_schema.count;*
>> *...*
>> and it started working. But I couldn't get the reason.
>>
>> Can someone please explain me? What was happening earlier and what is
>> happening with addition of these 2 lines?
>>
>> ~Sarath
>>
>>
>> On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>>> No Sonal, I'm not doing any explicit call to stop context.
>>>
>>> If you see my previous post to Michael, the commented portion of the
>>> code is my requirement. When I run this over standalone spark cluster, the
>>> execution keeps running with no output or error. After waiting for several
>>> minutes I'm killing it by pressing Ctrl+C in the terminal.
>>>
>>> But the same code runs perfectly when executed from spark shell.
>>>
>>> ~Sarath
>>>
>>>
>>> On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal <so...@gmail.com>
>>> wrote:
>>>
>>>> Hi Sarath,
>>>>
>>>> Are you explicitly stopping the context?
>>>>
>>>> sc.stop()
>>>>
>>>>
>>>>
>>>>
>>>> Best Regards,
>>>> Sonal
>>>> Nube Technologies <http://www.nubetech.co>
>>>>
>>>> <http://in.linkedin.com/in/sonalgoyal>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>
>>>>> Hi Michael, Soumya,
>>>>>
>>>>> Can you please check and let me know what is the issue? what am I
>>>>> missing?
>>>>> Let me know if you need any logs to analyze.
>>>>>
>>>>> ~Sarath
>>>>>
>>>>>
>>>>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>
>>>>>> Hi Michael,
>>>>>>
>>>>>> Tried it. It's correctly printing the line counts of both the files.
>>>>>> Here's what I tried -
>>>>>>
>>>>>> *Code:*
>>>>>> *package test*
>>>>>> *object Test4 {*
>>>>>> *  case class Test(fld1: String, *
>>>>>> *   fld2: String, *
>>>>>> *   fld3: String, *
>>>>>> *   fld4: String, *
>>>>>> *   fld5: String, *
>>>>>> *   fld6: Double, *
>>>>>> *   fld7: String);*
>>>>>> *  def main(args: Array[String]) {*
>>>>>> *    val conf = new SparkConf()*
>>>>>> *    .setMaster(args(0))*
>>>>>> * .setAppName("SQLTest")*
>>>>>> * .setSparkHome(args(1))*
>>>>>> * .set("spark.executor.memory", "2g");*
>>>>>> *    val sc = new SparkContext(conf);*
>>>>>> *    sc.addJar("test1-0.1.jar");*
>>>>>> *    val file1 = sc.textFile(args(2));*
>>>>>> *    println(file1.count());*
>>>>>> *    val file2 = sc.textFile(args(3));*
>>>>>> *    println(file2.count());*
>>>>>> *//    val sq = new SQLContext(sc);*
>>>>>> *//    import sq._*
>>>>>> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>> *//    file1_schema.registerAsTable("file1_tab");*
>>>>>> *//    file2_schema.registerAsTable("file2_tab");*
>>>>>> *//    val matched = sq.sql("select * from file1_tab l join file2_tab
>>>>>> s on " + *
>>>>>> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>>>>>> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>>>>>> *//     "l.fld6=s.fld6");*
>>>>>> *//    matched.collect().foreach(println);*
>>>>>> *  }*
>>>>>> *}*
>>>>>>
>>>>>> *Execution:*
>>>>>> *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>>>>>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>>>>>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>>>>>> "/usr/local/spark-1.0.1-bin-hadoop1"
>>>>>> hdfs://master:54310/user/hduser/file1.csv
>>>>>> hdfs://master:54310/user/hduser/file2.csv*
>>>>>>
>>>>>> ~Sarath
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <
>>>>>> michael@databricks.com> wrote:
>>>>>>
>>>>>>> What if you just run something like:
>>>>>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>
>>>>>>>> Yes Soumya, I did it.
>>>>>>>>
>>>>>>>> First I tried with the example available in the documentation
>>>>>>>> (example using people table and finding teenagers). After successfully
>>>>>>>> running it, I moved on to this one which is starting point to a bigger
>>>>>>>> requirement for which I'm evaluating Spark SQL.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Can you try submitting a very simple job to the cluster.
>>>>>>>>>
>>>>>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>
>>>>>>>>> Yes it is appearing on the Spark UI, and remains there with state
>>>>>>>>> as "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>>>>>>>
>>>>>>>>> Barring the statements to create the spark context, if I copy
>>>>>>>>> paste the lines of my code in spark shell, runs perfectly giving the
>>>>>>>>> desired output.
>>>>>>>>>
>>>>>>>>> ~Sarath
>>>>>>>>>
>>>>>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> When you submit your job, it should appear on the Spark UI. Same
>>>>>>>>>> with the REPL. Make sure you job is submitted to the cluster properly.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Soumya,
>>>>>>>>>>>
>>>>>>>>>>> Data is very small, 500+ lines in each file.
>>>>>>>>>>>
>>>>>>>>>>> Removed last 2 lines and placed this at the end
>>>>>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>>>>>>> 5min, the execution is still running.
>>>>>>>>>>>
>>>>>>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything
>>>>>>>>>>> going wrong, all are info messages.
>>>>>>>>>>>
>>>>>>>>>>> What else do I need check?
>>>>>>>>>>>
>>>>>>>>>>> ~Sarath
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Check your executor logs for the output or if your data is not
>>>>>>>>>>>> big collect it in the driver and print it.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to do a simple record matching between 2 files and
>>>>>>>>>>>> wrote following code -
>>>>>>>>>>>>
>>>>>>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>>>>>>> *object SqlTest {*
>>>>>>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>>>>>>> *  val file1 =
>>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>>>>>>> *  val file2 =
>>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>>>>>>> *  val matched = sq.sql("select * from file1_tab l join
>>>>>>>>>>>> file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and
>>>>>>>>>>>> l.fld5=s.fld5 and l.fld2=s.fld2");*
>>>>>>>>>>>> *  val count = matched.count();*
>>>>>>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>>>>>>> records");*
>>>>>>>>>>>> *}*
>>>>>>>>>>>>
>>>>>>>>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>>>>>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>>>>>>>>> forcibly killing it.
>>>>>>>>>>>> But the same program is working well when executed from a spark
>>>>>>>>>>>> shell.
>>>>>>>>>>>>
>>>>>>>>>>>> What is going wrong? What am I missing?
>>>>>>>>>>>>
>>>>>>>>>>>> ~Sarath
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Michael Armbrust <mi...@databricks.com>.
What version are you running?  Could you provide a jstack of the driver and
executor when it is hanging?


On Thu, Jul 17, 2014 at 10:55 AM, Sarath Chandra <
sarathchandra.josyam@algofusiontech.com> wrote:

> Added below 2 lines just before the sql query line -
> *...*
> *file1_schema.count;*
> *file2_schema.count;*
> *...*
> and it started working. But I couldn't get the reason.
>
> Can someone please explain me? What was happening earlier and what is
> happening with addition of these 2 lines?
>
> ~Sarath
>
>
> On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
>> No Sonal, I'm not doing any explicit call to stop context.
>>
>> If you see my previous post to Michael, the commented portion of the code
>> is my requirement. When I run this over standalone spark cluster, the
>> execution keeps running with no output or error. After waiting for several
>> minutes I'm killing it by pressing Ctrl+C in the terminal.
>>
>> But the same code runs perfectly when executed from spark shell.
>>
>> ~Sarath
>>
>>
>> On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal <so...@gmail.com>
>> wrote:
>>
>>> Hi Sarath,
>>>
>>> Are you explicitly stopping the context?
>>>
>>> sc.stop()
>>>
>>>
>>>
>>>
>>> Best Regards,
>>> Sonal
>>> Nube Technologies <http://www.nubetech.co>
>>>
>>> <http://in.linkedin.com/in/sonalgoyal>
>>>
>>>
>>>
>>>
>>> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>>> Hi Michael, Soumya,
>>>>
>>>> Can you please check and let me know what is the issue? what am I
>>>> missing?
>>>> Let me know if you need any logs to analyze.
>>>>
>>>> ~Sarath
>>>>
>>>>
>>>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>
>>>>> Hi Michael,
>>>>>
>>>>> Tried it. It's correctly printing the line counts of both the files.
>>>>> Here's what I tried -
>>>>>
>>>>> *Code:*
>>>>> *package test*
>>>>> *object Test4 {*
>>>>> *  case class Test(fld1: String, *
>>>>> *   fld2: String, *
>>>>> *   fld3: String, *
>>>>> *   fld4: String, *
>>>>> *   fld5: String, *
>>>>> *   fld6: Double, *
>>>>> *   fld7: String);*
>>>>> *  def main(args: Array[String]) {*
>>>>> *    val conf = new SparkConf()*
>>>>> *    .setMaster(args(0))*
>>>>> * .setAppName("SQLTest")*
>>>>> * .setSparkHome(args(1))*
>>>>> * .set("spark.executor.memory", "2g");*
>>>>> *    val sc = new SparkContext(conf);*
>>>>> *    sc.addJar("test1-0.1.jar");*
>>>>> *    val file1 = sc.textFile(args(2));*
>>>>> *    println(file1.count());*
>>>>> *    val file2 = sc.textFile(args(3));*
>>>>> *    println(file2.count());*
>>>>> *//    val sq = new SQLContext(sc);*
>>>>> *//    import sq._*
>>>>> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>> *//    file1_schema.registerAsTable("file1_tab");*
>>>>> *//    file2_schema.registerAsTable("file2_tab");*
>>>>> *//    val matched = sq.sql("select * from file1_tab l join file2_tab
>>>>> s on " + *
>>>>> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>>>>> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>>>>> *//     "l.fld6=s.fld6");*
>>>>> *//    matched.collect().foreach(println);*
>>>>> *  }*
>>>>> *}*
>>>>>
>>>>> *Execution:*
>>>>> *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>>>>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>>>>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>>>>> "/usr/local/spark-1.0.1-bin-hadoop1"
>>>>> hdfs://master:54310/user/hduser/file1.csv
>>>>> hdfs://master:54310/user/hduser/file2.csv*
>>>>>
>>>>> ~Sarath
>>>>>
>>>>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <
>>>>> michael@databricks.com> wrote:
>>>>>
>>>>>> What if you just run something like:
>>>>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>
>>>>>>> Yes Soumya, I did it.
>>>>>>>
>>>>>>> First I tried with the example available in the documentation
>>>>>>> (example using people table and finding teenagers). After successfully
>>>>>>> running it, I moved on to this one which is starting point to a bigger
>>>>>>> requirement for which I'm evaluating Spark SQL.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Can you try submitting a very simple job to the cluster.
>>>>>>>>
>>>>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>
>>>>>>>> Yes it is appearing on the Spark UI, and remains there with state
>>>>>>>> as "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>>>>>>
>>>>>>>> Barring the statements to create the spark context, if I copy paste
>>>>>>>> the lines of my code in spark shell, runs perfectly giving the desired
>>>>>>>> output.
>>>>>>>>
>>>>>>>> ~Sarath
>>>>>>>>
>>>>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> When you submit your job, it should appear on the Spark UI. Same
>>>>>>>>> with the REPL. Make sure you job is submitted to the cluster properly.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Soumya,
>>>>>>>>>>
>>>>>>>>>> Data is very small, 500+ lines in each file.
>>>>>>>>>>
>>>>>>>>>> Removed last 2 lines and placed this at the end
>>>>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>>>>>> 5min, the execution is still running.
>>>>>>>>>>
>>>>>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything
>>>>>>>>>> going wrong, all are info messages.
>>>>>>>>>>
>>>>>>>>>> What else do I need check?
>>>>>>>>>>
>>>>>>>>>> ~Sarath
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Check your executor logs for the output or if your data is not
>>>>>>>>>>> big collect it in the driver and print it.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to do a simple record matching between 2 files and
>>>>>>>>>>> wrote following code -
>>>>>>>>>>>
>>>>>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>>>>>> *object SqlTest {*
>>>>>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>>>>>> *  val file1 =
>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>>>>>> *  val file2 =
>>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>>>>>> *  val matched = sq.sql("select * from file1_tab l join
>>>>>>>>>>> file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and
>>>>>>>>>>> l.fld5=s.fld5 and l.fld2=s.fld2");*
>>>>>>>>>>> *  val count = matched.count();*
>>>>>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>>>>>> records");*
>>>>>>>>>>> *}*
>>>>>>>>>>>
>>>>>>>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>>>>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>>>>>>>> forcibly killing it.
>>>>>>>>>>> But the same program is working well when executed from a spark
>>>>>>>>>>> shell.
>>>>>>>>>>>
>>>>>>>>>>> What is going wrong? What am I missing?
>>>>>>>>>>>
>>>>>>>>>>> ~Sarath
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
Added below 2 lines just before the sql query line -
*...*
*file1_schema.count;*
*file2_schema.count;*
*...*
and it started working. But I couldn't get the reason.

Can someone please explain me? What was happening earlier and what is
happening with addition of these 2 lines?

~Sarath


On Thu, Jul 17, 2014 at 1:13 PM, Sarath Chandra <
sarathchandra.josyam@algofusiontech.com> wrote:

> No Sonal, I'm not doing any explicit call to stop context.
>
> If you see my previous post to Michael, the commented portion of the code
> is my requirement. When I run this over standalone spark cluster, the
> execution keeps running with no output or error. After waiting for several
> minutes I'm killing it by pressing Ctrl+C in the terminal.
>
> But the same code runs perfectly when executed from spark shell.
>
> ~Sarath
>
>
> On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal <so...@gmail.com>
> wrote:
>
>> Hi Sarath,
>>
>> Are you explicitly stopping the context?
>>
>> sc.stop()
>>
>>
>>
>>
>> Best Regards,
>> Sonal
>> Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>>
>> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>>> Hi Michael, Soumya,
>>>
>>> Can you please check and let me know what is the issue? what am I
>>> missing?
>>> Let me know if you need any logs to analyze.
>>>
>>> ~Sarath
>>>
>>>
>>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>>> Hi Michael,
>>>>
>>>> Tried it. It's correctly printing the line counts of both the files.
>>>> Here's what I tried -
>>>>
>>>> *Code:*
>>>> *package test*
>>>> *object Test4 {*
>>>> *  case class Test(fld1: String, *
>>>> *   fld2: String, *
>>>> *   fld3: String, *
>>>> *   fld4: String, *
>>>> *   fld5: String, *
>>>> *   fld6: Double, *
>>>> *   fld7: String);*
>>>> *  def main(args: Array[String]) {*
>>>> *    val conf = new SparkConf()*
>>>> *    .setMaster(args(0))*
>>>> * .setAppName("SQLTest")*
>>>> * .setSparkHome(args(1))*
>>>> * .set("spark.executor.memory", "2g");*
>>>> *    val sc = new SparkContext(conf);*
>>>> *    sc.addJar("test1-0.1.jar");*
>>>> *    val file1 = sc.textFile(args(2));*
>>>> *    println(file1.count());*
>>>> *    val file2 = sc.textFile(args(3));*
>>>> *    println(file2.count());*
>>>> *//    val sq = new SQLContext(sc);*
>>>> *//    import sq._*
>>>> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>> *//    file1_schema.registerAsTable("file1_tab");*
>>>> *//    file2_schema.registerAsTable("file2_tab");*
>>>> *//    val matched = sq.sql("select * from file1_tab l join file2_tab s
>>>> on " + *
>>>> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>>>> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>>>> *//     "l.fld6=s.fld6");*
>>>> *//    matched.collect().foreach(println);*
>>>> *  }*
>>>> *}*
>>>>
>>>> *Execution:*
>>>> *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>>>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>>>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>>>> "/usr/local/spark-1.0.1-bin-hadoop1"
>>>> hdfs://master:54310/user/hduser/file1.csv
>>>> hdfs://master:54310/user/hduser/file2.csv*
>>>>
>>>> ~Sarath
>>>>
>>>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>>> What if you just run something like:
>>>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>>>>
>>>>>
>>>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>
>>>>>> Yes Soumya, I did it.
>>>>>>
>>>>>> First I tried with the example available in the documentation
>>>>>> (example using people table and finding teenagers). After successfully
>>>>>> running it, I moved on to this one which is starting point to a bigger
>>>>>> requirement for which I'm evaluating Spark SQL.
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Can you try submitting a very simple job to the cluster.
>>>>>>>
>>>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>
>>>>>>> Yes it is appearing on the Spark UI, and remains there with state as
>>>>>>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>>>>>
>>>>>>> Barring the statements to create the spark context, if I copy paste
>>>>>>> the lines of my code in spark shell, runs perfectly giving the desired
>>>>>>> output.
>>>>>>>
>>>>>>> ~Sarath
>>>>>>>
>>>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>
>>>>>>>> When you submit your job, it should appear on the Spark UI. Same
>>>>>>>> with the REPL. Make sure you job is submitted to the cluster properly.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Soumya,
>>>>>>>>>
>>>>>>>>> Data is very small, 500+ lines in each file.
>>>>>>>>>
>>>>>>>>> Removed last 2 lines and placed this at the end
>>>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>>>>> 5min, the execution is still running.
>>>>>>>>>
>>>>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything
>>>>>>>>> going wrong, all are info messages.
>>>>>>>>>
>>>>>>>>> What else do I need check?
>>>>>>>>>
>>>>>>>>> ~Sarath
>>>>>>>>>
>>>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Check your executor logs for the output or if your data is not
>>>>>>>>>> big collect it in the driver and print it.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I'm trying to do a simple record matching between 2 files and
>>>>>>>>>> wrote following code -
>>>>>>>>>>
>>>>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>>>>> *object SqlTest {*
>>>>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>>>>> *  val file1 =
>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>>>>> *  val file2 =
>>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>>>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab
>>>>>>>>>> s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5
>>>>>>>>>> and l.fld2=s.fld2");*
>>>>>>>>>> *  val count = matched.count();*
>>>>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>>>>> records");*
>>>>>>>>>> *}*
>>>>>>>>>>
>>>>>>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>>>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>>>>>>> forcibly killing it.
>>>>>>>>>> But the same program is working well when executed from a spark
>>>>>>>>>> shell.
>>>>>>>>>>
>>>>>>>>>> What is going wrong? What am I missing?
>>>>>>>>>>
>>>>>>>>>> ~Sarath
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
No Sonal, I'm not doing any explicit call to stop context.

If you see my previous post to Michael, the commented portion of the code
is my requirement. When I run this over standalone spark cluster, the
execution keeps running with no output or error. After waiting for several
minutes I'm killing it by pressing Ctrl+C in the terminal.

But the same code runs perfectly when executed from spark shell.

~Sarath


On Thu, Jul 17, 2014 at 1:05 PM, Sonal Goyal <so...@gmail.com> wrote:

> Hi Sarath,
>
> Are you explicitly stopping the context?
>
> sc.stop()
>
>
>
>
> Best Regards,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
>
> On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
>> Hi Michael, Soumya,
>>
>> Can you please check and let me know what is the issue? what am I missing?
>> Let me know if you need any logs to analyze.
>>
>> ~Sarath
>>
>>
>> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>>> Hi Michael,
>>>
>>> Tried it. It's correctly printing the line counts of both the files.
>>> Here's what I tried -
>>>
>>> *Code:*
>>> *package test*
>>> *object Test4 {*
>>> *  case class Test(fld1: String, *
>>> *   fld2: String, *
>>> *   fld3: String, *
>>> *   fld4: String, *
>>> *   fld5: String, *
>>> *   fld6: Double, *
>>> *   fld7: String);*
>>> *  def main(args: Array[String]) {*
>>> *    val conf = new SparkConf()*
>>> *    .setMaster(args(0))*
>>> * .setAppName("SQLTest")*
>>> * .setSparkHome(args(1))*
>>> * .set("spark.executor.memory", "2g");*
>>> *    val sc = new SparkContext(conf);*
>>> *    sc.addJar("test1-0.1.jar");*
>>> *    val file1 = sc.textFile(args(2));*
>>> *    println(file1.count());*
>>> *    val file2 = sc.textFile(args(3));*
>>> *    println(file2.count());*
>>> *//    val sq = new SQLContext(sc);*
>>> *//    import sq._*
>>> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
>>> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
>>> *//    file1_schema.registerAsTable("file1_tab");*
>>> *//    file2_schema.registerAsTable("file2_tab");*
>>> *//    val matched = sq.sql("select * from file1_tab l join file2_tab s
>>> on " + *
>>> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>>> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>>> *//     "l.fld6=s.fld6");*
>>> *//    matched.collect().foreach(println);*
>>> *  }*
>>> *}*
>>>
>>> *Execution:*
>>> *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>>> "/usr/local/spark-1.0.1-bin-hadoop1"
>>> hdfs://master:54310/user/hduser/file1.csv
>>> hdfs://master:54310/user/hduser/file2.csv*
>>>
>>> ~Sarath
>>>
>>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <
>>> michael@databricks.com> wrote:
>>>
>>>> What if you just run something like:
>>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>>>
>>>>
>>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>
>>>>> Yes Soumya, I did it.
>>>>>
>>>>> First I tried with the example available in the documentation (example
>>>>> using people table and finding teenagers). After successfully running it, I
>>>>> moved on to this one which is starting point to a bigger requirement for
>>>>> which I'm evaluating Spark SQL.
>>>>>
>>>>>
>>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>>>> soumya.simanta@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> Can you try submitting a very simple job to the cluster.
>>>>>>
>>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>
>>>>>> Yes it is appearing on the Spark UI, and remains there with state as
>>>>>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>>>>
>>>>>> Barring the statements to create the spark context, if I copy paste
>>>>>> the lines of my code in spark shell, runs perfectly giving the desired
>>>>>> output.
>>>>>>
>>>>>> ~Sarath
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>
>>>>>>> When you submit your job, it should appear on the Spark UI. Same
>>>>>>> with the REPL. Make sure you job is submitted to the cluster properly.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>
>>>>>>>> Hi Soumya,
>>>>>>>>
>>>>>>>> Data is very small, 500+ lines in each file.
>>>>>>>>
>>>>>>>> Removed last 2 lines and placed this at the end
>>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>>>> 5min, the execution is still running.
>>>>>>>>
>>>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything
>>>>>>>> going wrong, all are info messages.
>>>>>>>>
>>>>>>>> What else do I need check?
>>>>>>>>
>>>>>>>> ~Sarath
>>>>>>>>
>>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Check your executor logs for the output or if your data is not big
>>>>>>>>> collect it in the driver and print it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I'm trying to do a simple record matching between 2 files and
>>>>>>>>> wrote following code -
>>>>>>>>>
>>>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>>>> *object SqlTest {*
>>>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>>>> *  val file1 =
>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>>>> *  val file2 =
>>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab
>>>>>>>>> s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5
>>>>>>>>> and l.fld2=s.fld2");*
>>>>>>>>> *  val count = matched.count();*
>>>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>>>> records");*
>>>>>>>>> *}*
>>>>>>>>>
>>>>>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>>>>>> forcibly killing it.
>>>>>>>>> But the same program is working well when executed from a spark
>>>>>>>>> shell.
>>>>>>>>>
>>>>>>>>> What is going wrong? What am I missing?
>>>>>>>>>
>>>>>>>>> ~Sarath
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sonal Goyal <so...@gmail.com>.
Hi Sarath,

Are you explicitly stopping the context?

sc.stop()




Best Regards,
Sonal
Nube Technologies <http://www.nubetech.co>

<http://in.linkedin.com/in/sonalgoyal>




On Thu, Jul 17, 2014 at 12:51 PM, Sarath Chandra <
sarathchandra.josyam@algofusiontech.com> wrote:

> Hi Michael, Soumya,
>
> Can you please check and let me know what is the issue? what am I missing?
> Let me know if you need any logs to analyze.
>
> ~Sarath
>
>
> On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
>> Hi Michael,
>>
>> Tried it. It's correctly printing the line counts of both the files.
>> Here's what I tried -
>>
>> *Code:*
>> *package test*
>> *object Test4 {*
>> *  case class Test(fld1: String, *
>> *   fld2: String, *
>> *   fld3: String, *
>> *   fld4: String, *
>> *   fld5: String, *
>> *   fld6: Double, *
>> *   fld7: String);*
>> *  def main(args: Array[String]) {*
>> *    val conf = new SparkConf()*
>> *    .setMaster(args(0))*
>> * .setAppName("SQLTest")*
>> * .setSparkHome(args(1))*
>> * .set("spark.executor.memory", "2g");*
>> *    val sc = new SparkContext(conf);*
>> *    sc.addJar("test1-0.1.jar");*
>> *    val file1 = sc.textFile(args(2));*
>> *    println(file1.count());*
>> *    val file2 = sc.textFile(args(3));*
>> *    println(file2.count());*
>> *//    val sq = new SQLContext(sc);*
>> *//    import sq._*
>> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
>> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
>> *//    file1_schema.registerAsTable("file1_tab");*
>> *//    file2_schema.registerAsTable("file2_tab");*
>> *//    val matched = sq.sql("select * from file1_tab l join file2_tab s
>> on " + *
>> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
>> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
>> *//     "l.fld6=s.fld6");*
>> *//    matched.collect().foreach(println);*
>> *  }*
>> *}*
>>
>> *Execution:*
>> *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
>> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
>> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
>> "/usr/local/spark-1.0.1-bin-hadoop1"
>> hdfs://master:54310/user/hduser/file1.csv
>> hdfs://master:54310/user/hduser/file2.csv*
>>
>> ~Sarath
>>
>> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <michael@databricks.com
>> > wrote:
>>
>>> What if you just run something like:
>>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>>
>>>
>>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>>> Yes Soumya, I did it.
>>>>
>>>> First I tried with the example available in the documentation (example
>>>> using people table and finding teenagers). After successfully running it, I
>>>> moved on to this one which is starting point to a bigger requirement for
>>>> which I'm evaluating Spark SQL.
>>>>
>>>>
>>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>>> soumya.simanta@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> Can you try submitting a very simple job to the cluster.
>>>>>
>>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>
>>>>> Yes it is appearing on the Spark UI, and remains there with state as
>>>>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>>>
>>>>> Barring the statements to create the spark context, if I copy paste
>>>>> the lines of my code in spark shell, runs perfectly giving the desired
>>>>> output.
>>>>>
>>>>> ~Sarath
>>>>>
>>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>>> soumya.simanta@gmail.com> wrote:
>>>>>
>>>>>> When you submit your job, it should appear on the Spark UI. Same with
>>>>>> the REPL. Make sure you job is submitted to the cluster properly.
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>
>>>>>>> Hi Soumya,
>>>>>>>
>>>>>>> Data is very small, 500+ lines in each file.
>>>>>>>
>>>>>>> Removed last 2 lines and placed this at the end
>>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>>> 5min, the execution is still running.
>>>>>>>
>>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything
>>>>>>> going wrong, all are info messages.
>>>>>>>
>>>>>>> What else do I need check?
>>>>>>>
>>>>>>> ~Sarath
>>>>>>>
>>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>>
>>>>>>>> Check your executor logs for the output or if your data is not big
>>>>>>>> collect it in the driver and print it.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I'm trying to do a simple record matching between 2 files and wrote
>>>>>>>> following code -
>>>>>>>>
>>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>>> *object SqlTest {*
>>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>>> *  val file1 =
>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>>> *  val file2 =
>>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab s
>>>>>>>> on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5
>>>>>>>> and l.fld2=s.fld2");*
>>>>>>>> *  val count = matched.count();*
>>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>>> records");*
>>>>>>>> *}*
>>>>>>>>
>>>>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>>>>> forcibly killing it.
>>>>>>>> But the same program is working well when executed from a spark
>>>>>>>> shell.
>>>>>>>>
>>>>>>>> What is going wrong? What am I missing?
>>>>>>>>
>>>>>>>> ~Sarath
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
Hi Michael, Soumya,

Can you please check and let me know what is the issue? what am I missing?
Let me know if you need any logs to analyze.

~Sarath


On Wed, Jul 16, 2014 at 8:24 PM, Sarath Chandra <
sarathchandra.josyam@algofusiontech.com> wrote:

> Hi Michael,
>
> Tried it. It's correctly printing the line counts of both the files.
> Here's what I tried -
>
> *Code:*
> *package test*
> *object Test4 {*
> *  case class Test(fld1: String, *
> *   fld2: String, *
> *   fld3: String, *
> *   fld4: String, *
> *   fld5: String, *
> *   fld6: Double, *
> *   fld7: String);*
> *  def main(args: Array[String]) {*
> *    val conf = new SparkConf()*
> *    .setMaster(args(0))*
> * .setAppName("SQLTest")*
> * .setSparkHome(args(1))*
> * .set("spark.executor.memory", "2g");*
> *    val sc = new SparkContext(conf);*
> *    sc.addJar("test1-0.1.jar");*
> *    val file1 = sc.textFile(args(2));*
> *    println(file1.count());*
> *    val file2 = sc.textFile(args(3));*
> *    println(file2.count());*
> *//    val sq = new SQLContext(sc);*
> *//    import sq._*
> *//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
> *//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
> *//    val file1_schema = sq.createSchemaRDD(file1_recs);*
> *//    val file2_schema = sq.createSchemaRDD(file2_recs);*
> *//    file1_schema.registerAsTable("file1_tab");*
> *//    file2_schema.registerAsTable("file2_tab");*
> *//    val matched = sq.sql("select * from file1_tab l join file2_tab s on
> " + *
> *//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
> *//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
> *//     "l.fld6=s.fld6");*
> *//    matched.collect().foreach(println);*
> *  }*
> *}*
>
> *Execution:*
> *export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
> *export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
> *java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
> "/usr/local/spark-1.0.1-bin-hadoop1"
> hdfs://master:54310/user/hduser/file1.csv
> hdfs://master:54310/user/hduser/file2.csv*
>
> ~Sarath
>
> On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> What if you just run something like:
>> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>>
>>
>> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>>> Yes Soumya, I did it.
>>>
>>> First I tried with the example available in the documentation (example
>>> using people table and finding teenagers). After successfully running it, I
>>> moved on to this one which is starting point to a bigger requirement for
>>> which I'm evaluating Spark SQL.
>>>
>>>
>>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <
>>> soumya.simanta@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Can you try submitting a very simple job to the cluster.
>>>>
>>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>
>>>> Yes it is appearing on the Spark UI, and remains there with state as
>>>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>>
>>>> Barring the statements to create the spark context, if I copy paste the
>>>> lines of my code in spark shell, runs perfectly giving the desired output.
>>>>
>>>> ~Sarath
>>>>
>>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>>> soumya.simanta@gmail.com> wrote:
>>>>
>>>>> When you submit your job, it should appear on the Spark UI. Same with
>>>>> the REPL. Make sure you job is submitted to the cluster properly.
>>>>>
>>>>>
>>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>
>>>>>> Hi Soumya,
>>>>>>
>>>>>> Data is very small, 500+ lines in each file.
>>>>>>
>>>>>> Removed last 2 lines and placed this at the end
>>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>>> 5min, the execution is still running.
>>>>>>
>>>>>> Checked logs, nothing in stdout. In stderr I don't see anything going
>>>>>> wrong, all are info messages.
>>>>>>
>>>>>> What else do I need check?
>>>>>>
>>>>>> ~Sarath
>>>>>>
>>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>>> soumya.simanta@gmail.com> wrote:
>>>>>>
>>>>>>> Check your executor logs for the output or if your data is not big
>>>>>>> collect it in the driver and print it.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I'm trying to do a simple record matching between 2 files and wrote
>>>>>>> following code -
>>>>>>>
>>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>>> *object SqlTest {*
>>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>>> *  val file1 =
>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>>> *  val file2 =
>>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>>> *  val sq = new SQLContext(sc);*
>>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab s
>>>>>>> on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5
>>>>>>> and l.fld2=s.fld2");*
>>>>>>> *  val count = matched.count();*
>>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>>> records");*
>>>>>>> *}*
>>>>>>>
>>>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>>>> forcibly killing it.
>>>>>>> But the same program is working well when executed from a spark
>>>>>>> shell.
>>>>>>>
>>>>>>> What is going wrong? What am I missing?
>>>>>>>
>>>>>>> ~Sarath
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
Hi Michael,

Tried it. It's correctly printing the line counts of both the files. Here's
what I tried -

*Code:*
*package test*
*object Test4 {*
*  case class Test(fld1: String, *
*   fld2: String, *
*   fld3: String, *
*   fld4: String, *
*   fld5: String, *
*   fld6: Double, *
*   fld7: String);*
*  def main(args: Array[String]) {*
*    val conf = new SparkConf()*
*    .setMaster(args(0))*
* .setAppName("SQLTest")*
* .setSparkHome(args(1))*
* .set("spark.executor.memory", "2g");*
*    val sc = new SparkContext(conf);*
*    sc.addJar("test1-0.1.jar");*
*    val file1 = sc.textFile(args(2));*
*    println(file1.count());*
*    val file2 = sc.textFile(args(3));*
*    println(file2.count());*
*//    val sq = new SQLContext(sc);*
*//    import sq._*
*//    val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
*//    val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
*//    val file1_schema = sq.createSchemaRDD(file1_recs);*
*//    val file2_schema = sq.createSchemaRDD(file2_recs);*
*//    file1_schema.registerAsTable("file1_tab");*
*//    file2_schema.registerAsTable("file2_tab");*
*//    val matched = sq.sql("select * from file1_tab l join file2_tab s on
" + *
*//     "l.fld7=s.fld7 where l.fld2=s.fld2 and " + *
*//     "l.fld3=s.fld3 and l.fld4=s.fld4 and " + *
*//     "l.fld6=s.fld6");*
*//    matched.collect().foreach(println);*
*  }*
*}*

*Execution:*
*export CLASSPATH=$HADOOP_PREFIX/conf:$SPARK_HOME/lib/*:test1-0.1.jar*
*export CONFIG_OPTS="-Dspark.jars=test1-0.1.jar"*
*java -cp $CLASSPATH $CONFIG_OPTS test.Test4 spark://master:7077
"/usr/local/spark-1.0.1-bin-hadoop1"
hdfs://master:54310/user/hduser/file1.csv
hdfs://master:54310/user/hduser/file2.csv*

~Sarath

On Wed, Jul 16, 2014 at 8:14 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> What if you just run something like:
> *sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*
>
>
> On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
>> Yes Soumya, I did it.
>>
>> First I tried with the example available in the documentation (example
>> using people table and finding teenagers). After successfully running it, I
>> moved on to this one which is starting point to a bigger requirement for
>> which I'm evaluating Spark SQL.
>>
>>
>> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <soumya.simanta@gmail.com
>> > wrote:
>>
>>>
>>>
>>> Can you try submitting a very simple job to the cluster.
>>>
>>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>> Yes it is appearing on the Spark UI, and remains there with state as
>>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>>
>>> Barring the statements to create the spark context, if I copy paste the
>>> lines of my code in spark shell, runs perfectly giving the desired output.
>>>
>>> ~Sarath
>>>
>>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <
>>> soumya.simanta@gmail.com> wrote:
>>>
>>>> When you submit your job, it should appear on the Spark UI. Same with
>>>> the REPL. Make sure you job is submitted to the cluster properly.
>>>>
>>>>
>>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>
>>>>> Hi Soumya,
>>>>>
>>>>> Data is very small, 500+ lines in each file.
>>>>>
>>>>> Removed last 2 lines and placed this at the end
>>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>>> 5min, the execution is still running.
>>>>>
>>>>> Checked logs, nothing in stdout. In stderr I don't see anything going
>>>>> wrong, all are info messages.
>>>>>
>>>>> What else do I need check?
>>>>>
>>>>> ~Sarath
>>>>>
>>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>>> soumya.simanta@gmail.com> wrote:
>>>>>
>>>>>> Check your executor logs for the output or if your data is not big
>>>>>> collect it in the driver and print it.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I'm trying to do a simple record matching between 2 files and wrote
>>>>>> following code -
>>>>>>
>>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>>> *import org.apache.spark.rdd.RDD*
>>>>>> *object SqlTest {*
>>>>>> *  case class Test(fld1:String, fld2:String, fld3:String,
>>>>>> fld4:String, fld4:String, fld5:Double, fld6:String);*
>>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>>> *  val file1 =
>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>>> *  val file2 =
>>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>>> *  val sq = new SQLContext(sc);*
>>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab s
>>>>>> on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5
>>>>>> and l.fld2=s.fld2");*
>>>>>> *  val count = matched.count();*
>>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>>> records");*
>>>>>> *}*
>>>>>>
>>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>>> forcibly killing it.
>>>>>> But the same program is working well when executed from a spark shell.
>>>>>>
>>>>>> What is going wrong? What am I missing?
>>>>>>
>>>>>> ~Sarath
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Michael Armbrust <mi...@databricks.com>.
What if you just run something like:
*sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv").count()*


On Wed, Jul 16, 2014 at 10:37 AM, Sarath Chandra <
sarathchandra.josyam@algofusiontech.com> wrote:

> Yes Soumya, I did it.
>
> First I tried with the example available in the documentation (example
> using people table and finding teenagers). After successfully running it, I
> moved on to this one which is starting point to a bigger requirement for
> which I'm evaluating Spark SQL.
>
>
> On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <so...@gmail.com>
> wrote:
>
>>
>>
>> Can you try submitting a very simple job to the cluster.
>>
>> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>> Yes it is appearing on the Spark UI, and remains there with state as
>> "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>>
>> Barring the statements to create the spark context, if I copy paste the
>> lines of my code in spark shell, runs perfectly giving the desired output.
>>
>> ~Sarath
>>
>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <soumya.simanta@gmail.com
>> > wrote:
>>
>>> When you submit your job, it should appear on the Spark UI. Same with
>>> the REPL. Make sure you job is submitted to the cluster properly.
>>>
>>>
>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>>> Hi Soumya,
>>>>
>>>> Data is very small, 500+ lines in each file.
>>>>
>>>> Removed last 2 lines and placed this at the end
>>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>>> 5min, the execution is still running.
>>>>
>>>> Checked logs, nothing in stdout. In stderr I don't see anything going
>>>> wrong, all are info messages.
>>>>
>>>> What else do I need check?
>>>>
>>>> ~Sarath
>>>>
>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>>> soumya.simanta@gmail.com> wrote:
>>>>
>>>>> Check your executor logs for the output or if your data is not big
>>>>> collect it in the driver and print it.
>>>>>
>>>>>
>>>>>
>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm trying to do a simple record matching between 2 files and wrote
>>>>> following code -
>>>>>
>>>>> *import org.apache.spark.sql.SQLContext;*
>>>>> *import org.apache.spark.rdd.RDD*
>>>>> *object SqlTest {*
>>>>> *  case class Test(fld1:String, fld2:String, fld3:String, fld4:String,
>>>>> fld4:String, fld5:Double, fld6:String);*
>>>>> *  sc.addJar("test1-0.1.jar");*
>>>>> *  val file1 =
>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>>> *  val file2 =
>>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>>> *  val sq = new SQLContext(sc);*
>>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab s on
>>>>> l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and
>>>>> l.fld2=s.fld2");*
>>>>> *  val count = matched.count();*
>>>>> *  System.out.println("Found " + matched.count() + " matching
>>>>> records");*
>>>>> *}*
>>>>>
>>>>> When I run this program on a standalone spark cluster, it keeps
>>>>> running for long with no output or error. After waiting for few mins I'm
>>>>> forcibly killing it.
>>>>> But the same program is working well when executed from a spark shell.
>>>>>
>>>>> What is going wrong? What am I missing?
>>>>>
>>>>> ~Sarath
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
Yes Soumya, I did it.

First I tried with the example available in the documentation (example
using people table and finding teenagers). After successfully running it, I
moved on to this one which is starting point to a bigger requirement for
which I'm evaluating Spark SQL.


On Wed, Jul 16, 2014 at 7:59 PM, Soumya Simanta <so...@gmail.com>
wrote:

>
>
> Can you try submitting a very simple job to the cluster.
>
> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
> Yes it is appearing on the Spark UI, and remains there with state as
> "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
>
> Barring the statements to create the spark context, if I copy paste the
> lines of my code in spark shell, runs perfectly giving the desired output.
>
> ~Sarath
>
> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <so...@gmail.com>
> wrote:
>
>> When you submit your job, it should appear on the Spark UI. Same with the
>> REPL. Make sure you job is submitted to the cluster properly.
>>
>>
>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>>> Hi Soumya,
>>>
>>> Data is very small, 500+ lines in each file.
>>>
>>> Removed last 2 lines and placed this at the end
>>> "matched.collect().foreach(println);". Still no luck. It's been more than
>>> 5min, the execution is still running.
>>>
>>> Checked logs, nothing in stdout. In stderr I don't see anything going
>>> wrong, all are info messages.
>>>
>>> What else do I need check?
>>>
>>> ~Sarath
>>>
>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <
>>> soumya.simanta@gmail.com> wrote:
>>>
>>>> Check your executor logs for the output or if your data is not big
>>>> collect it in the driver and print it.
>>>>
>>>>
>>>>
>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>>
>>>> Hi All,
>>>>
>>>> I'm trying to do a simple record matching between 2 files and wrote
>>>> following code -
>>>>
>>>> *import org.apache.spark.sql.SQLContext;*
>>>> *import org.apache.spark.rdd.RDD*
>>>> *object SqlTest {*
>>>> *  case class Test(fld1:String, fld2:String, fld3:String, fld4:String,
>>>> fld4:String, fld5:Double, fld6:String);*
>>>> *  sc.addJar("test1-0.1.jar");*
>>>> *  val file1 =
>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>>> *  val file2 =
>>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>>> *  val sq = new SQLContext(sc);*
>>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>>> *  file1_schema.registerAsTable("file1_tab");*
>>>> *  file2_schema.registerAsTable("file2_tab");*
>>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab s on
>>>> l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and
>>>> l.fld2=s.fld2");*
>>>> *  val count = matched.count();*
>>>> *  System.out.println("Found " + matched.count() + " matching
>>>> records");*
>>>> *}*
>>>>
>>>> When I run this program on a standalone spark cluster, it keeps running
>>>> for long with no output or error. After waiting for few mins I'm forcibly
>>>> killing it.
>>>> But the same program is working well when executed from a spark shell.
>>>>
>>>> What is going wrong? What am I missing?
>>>>
>>>> ~Sarath
>>>>
>>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Soumya Simanta <so...@gmail.com>.

Can you try submitting a very simple job to the cluster. 

> On Jul 16, 2014, at 10:25 AM, Sarath Chandra <sa...@algofusiontech.com> wrote:
> 
> Yes it is appearing on the Spark UI, and remains there with state as "RUNNING" till I press Ctrl+C in the terminal to kill the execution.
> 
> Barring the statements to create the spark context, if I copy paste the lines of my code in spark shell, runs perfectly giving the desired output.
> 
> ~Sarath
> 
>> On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <so...@gmail.com> wrote:
>> When you submit your job, it should appear on the Spark UI. Same with the REPL. Make sure you job is submitted to the cluster properly. 
>> 
>> 
>>> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <sa...@algofusiontech.com> wrote:
>>> Hi Soumya,
>>> 
>>> Data is very small, 500+ lines in each file.
>>> 
>>> Removed last 2 lines and placed this at the end "matched.collect().foreach(println);". Still no luck. It's been more than 5min, the execution is still running.
>>> 
>>> Checked logs, nothing in stdout. In stderr I don't see anything going wrong, all are info messages.
>>> 
>>> What else do I need check?
>>> 
>>> ~Sarath
>>> 
>>>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <so...@gmail.com> wrote:
>>>> Check your executor logs for the output or if your data is not big collect it in the driver and print it. 
>>>> 
>>>> 
>>>> 
>>>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <sa...@algofusiontech.com> wrote:
>>>>> 
>>>>> Hi All,
>>>>> 
>>>>> I'm trying to do a simple record matching between 2 files and wrote following code -
>>>>> 
>>>>> import org.apache.spark.sql.SQLContext;
>>>>> import org.apache.spark.rdd.RDD
>>>>> object SqlTest {
>>>>>   case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);
>>>>>   sc.addJar("test1-0.1.jar");
>>>>>   val file1 = sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");
>>>>>   val file2 = sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");
>>>>>   val sq = new SQLContext(sc);
>>>>>   val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));
>>>>>   val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));
>>>>>   val file1_schema = sq.createSchemaRDD(file1_recs);
>>>>>   val file2_schema = sq.createSchemaRDD(file2_recs);
>>>>>   file1_schema.registerAsTable("file1_tab");
>>>>>   file2_schema.registerAsTable("file2_tab");
>>>>>   val matched = sq.sql("select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2");
>>>>>   val count = matched.count();
>>>>>   System.out.println("Found " + matched.count() + " matching records");
>>>>> }
>>>>> 
>>>>> When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it.
>>>>> But the same program is working well when executed from a spark shell.
>>>>> 
>>>>> What is going wrong? What am I missing?
>>>>> 
>>>>> ~Sarath
> 

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
Yes it is appearing on the Spark UI, and remains there with state as
"RUNNING" till I press Ctrl+C in the terminal to kill the execution.

Barring the statements to create the spark context, if I copy paste the
lines of my code in spark shell, runs perfectly giving the desired output.

~Sarath

On Wed, Jul 16, 2014 at 7:48 PM, Soumya Simanta <so...@gmail.com>
wrote:

> When you submit your job, it should appear on the Spark UI. Same with the
> REPL. Make sure you job is submitted to the cluster properly.
>
>
> On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
>> Hi Soumya,
>>
>> Data is very small, 500+ lines in each file.
>>
>> Removed last 2 lines and placed this at the end
>> "matched.collect().foreach(println);". Still no luck. It's been more than
>> 5min, the execution is still running.
>>
>> Checked logs, nothing in stdout. In stderr I don't see anything going
>> wrong, all are info messages.
>>
>> What else do I need check?
>>
>> ~Sarath
>>
>> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <soumya.simanta@gmail.com
>> > wrote:
>>
>>> Check your executor logs for the output or if your data is not big
>>> collect it in the driver and print it.
>>>
>>>
>>>
>>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>>> sarathchandra.josyam@algofusiontech.com> wrote:
>>>
>>> Hi All,
>>>
>>> I'm trying to do a simple record matching between 2 files and wrote
>>> following code -
>>>
>>> *import org.apache.spark.sql.SQLContext;*
>>> *import org.apache.spark.rdd.RDD*
>>> *object SqlTest {*
>>> *  case class Test(fld1:String, fld2:String, fld3:String, fld4:String,
>>> fld4:String, fld5:Double, fld6:String);*
>>> *  sc.addJar("test1-0.1.jar");*
>>> *  val file1 =
>>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>>> *  val file2 =
>>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>>> *  val sq = new SQLContext(sc);*
>>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>>> *  file1_schema.registerAsTable("file1_tab");*
>>> *  file2_schema.registerAsTable("file2_tab");*
>>> *  val matched = sq.sql("select * from file1_tab l join file2_tab s on
>>> l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and
>>> l.fld2=s.fld2");*
>>> *  val count = matched.count();*
>>> *  System.out.println("Found " + matched.count() + " matching records");*
>>> *}*
>>>
>>> When I run this program on a standalone spark cluster, it keeps running
>>> for long with no output or error. After waiting for few mins I'm forcibly
>>> killing it.
>>> But the same program is working well when executed from a spark shell.
>>>
>>> What is going wrong? What am I missing?
>>>
>>> ~Sarath
>>>
>>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Soumya Simanta <so...@gmail.com>.
When you submit your job, it should appear on the Spark UI. Same with the
REPL. Make sure you job is submitted to the cluster properly.


On Wed, Jul 16, 2014 at 10:08 AM, Sarath Chandra <
sarathchandra.josyam@algofusiontech.com> wrote:

> Hi Soumya,
>
> Data is very small, 500+ lines in each file.
>
> Removed last 2 lines and placed this at the end
> "matched.collect().foreach(println);". Still no luck. It's been more than
> 5min, the execution is still running.
>
> Checked logs, nothing in stdout. In stderr I don't see anything going
> wrong, all are info messages.
>
> What else do I need check?
>
> ~Sarath
>
> On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <so...@gmail.com>
> wrote:
>
>> Check your executor logs for the output or if your data is not big
>> collect it in the driver and print it.
>>
>>
>>
>> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
>> sarathchandra.josyam@algofusiontech.com> wrote:
>>
>> Hi All,
>>
>> I'm trying to do a simple record matching between 2 files and wrote
>> following code -
>>
>> *import org.apache.spark.sql.SQLContext;*
>> *import org.apache.spark.rdd.RDD*
>> *object SqlTest {*
>> *  case class Test(fld1:String, fld2:String, fld3:String, fld4:String,
>> fld4:String, fld5:Double, fld6:String);*
>> *  sc.addJar("test1-0.1.jar");*
>> *  val file1 =
>> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
>> *  val file2 =
>> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
>> *  val sq = new SQLContext(sc);*
>> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l =>
>> Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
>> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s =>
>> Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
>> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
>> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
>> *  file1_schema.registerAsTable("file1_tab");*
>> *  file2_schema.registerAsTable("file2_tab");*
>> *  val matched = sq.sql("select * from file1_tab l join file2_tab s on
>> l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and
>> l.fld2=s.fld2");*
>> *  val count = matched.count();*
>> *  System.out.println("Found " + matched.count() + " matching records");*
>> *}*
>>
>> When I run this program on a standalone spark cluster, it keeps running
>> for long with no output or error. After waiting for few mins I'm forcibly
>> killing it.
>> But the same program is working well when executed from a spark shell.
>>
>> What is going wrong? What am I missing?
>>
>> ~Sarath
>>
>>
>

Re: Simple record matching using Spark SQL

Posted by Sarath Chandra <sa...@algofusiontech.com>.
Hi Soumya,

Data is very small, 500+ lines in each file.

Removed last 2 lines and placed this at the end
"matched.collect().foreach(println);". Still no luck. It's been more than
5min, the execution is still running.

Checked logs, nothing in stdout. In stderr I don't see anything going
wrong, all are info messages.

What else do I need check?

~Sarath

On Wed, Jul 16, 2014 at 7:23 PM, Soumya Simanta <so...@gmail.com>
wrote:

> Check your executor logs for the output or if your data is not big collect
> it in the driver and print it.
>
>
>
> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <
> sarathchandra.josyam@algofusiontech.com> wrote:
>
> Hi All,
>
> I'm trying to do a simple record matching between 2 files and wrote
> following code -
>
> *import org.apache.spark.sql.SQLContext;*
> *import org.apache.spark.rdd.RDD*
> *object SqlTest {*
> *  case class Test(fld1:String, fld2:String, fld3:String, fld4:String,
> fld4:String, fld5:Double, fld6:String);*
> *  sc.addJar("test1-0.1.jar");*
> *  val file1 =
> sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");*
> *  val file2 =
> sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");*
> *  val sq = new SQLContext(sc);*
> *  val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => Test(l(0),
> l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));*
> *  val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => Test(s(0),
> s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));*
> *  val file1_schema = sq.createSchemaRDD(file1_recs);*
> *  val file2_schema = sq.createSchemaRDD(file2_recs);*
> *  file1_schema.registerAsTable("file1_tab");*
> *  file2_schema.registerAsTable("file2_tab");*
> *  val matched = sq.sql("select * from file1_tab l join file2_tab s on
> l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and
> l.fld2=s.fld2");*
> *  val count = matched.count();*
> *  System.out.println("Found " + matched.count() + " matching records");*
> *}*
>
> When I run this program on a standalone spark cluster, it keeps running
> for long with no output or error. After waiting for few mins I'm forcibly
> killing it.
> But the same program is working well when executed from a spark shell.
>
> What is going wrong? What am I missing?
>
> ~Sarath
>
>

Re: Simple record matching using Spark SQL

Posted by Soumya Simanta <so...@gmail.com>.
Check your executor logs for the output or if your data is not big collect it in the driver and print it. 



> On Jul 16, 2014, at 9:21 AM, Sarath Chandra <sa...@algofusiontech.com> wrote:
> 
> Hi All,
> 
> I'm trying to do a simple record matching between 2 files and wrote following code -
> 
> import org.apache.spark.sql.SQLContext;
> import org.apache.spark.rdd.RDD
> object SqlTest {
>   case class Test(fld1:String, fld2:String, fld3:String, fld4:String, fld4:String, fld5:Double, fld6:String);
>   sc.addJar("test1-0.1.jar");
>   val file1 = sc.textFile("hdfs://localhost:54310/user/hduser/file1.csv");
>   val file2 = sc.textFile("hdfs://localhost:54310/user/hduser/file2.csv");
>   val sq = new SQLContext(sc);
>   val file1_recs: RDD[Test] = file1.map(_.split(",")).map(l => Test(l(0), l(1), l(2), l(3), l(4), l(5).toDouble, l(6)));
>   val file2_recs: RDD[Test] = file2.map(_.split(",")).map(s => Test(s(0), s(1), s(2), s(3), s(4), s(5).toDouble, s(6)));
>   val file1_schema = sq.createSchemaRDD(file1_recs);
>   val file2_schema = sq.createSchemaRDD(file2_recs);
>   file1_schema.registerAsTable("file1_tab");
>   file2_schema.registerAsTable("file2_tab");
>   val matched = sq.sql("select * from file1_tab l join file2_tab s on l.fld6=s.fld6 where l.fld3=s.fld3 and l.fld4=s.fld4 and l.fld5=s.fld5 and l.fld2=s.fld2");
>   val count = matched.count();
>   System.out.println("Found " + matched.count() + " matching records");
> }
> 
> When I run this program on a standalone spark cluster, it keeps running for long with no output or error. After waiting for few mins I'm forcibly killing it.
> But the same program is working well when executed from a spark shell.
> 
> What is going wrong? What am I missing?
> 
> ~Sarath