You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by evil <qi...@gmail.com> on 2014/12/30 04:29:54 UTC

A question about using insert into in rdd foreach in spark 1.2

Hi All,
I have a  problem when I try to use insert into in loop, and this is my code  
def main(args: Array[String]) {
    //This is an empty table, schema is (Int,String)
   
sqlContext.parquetFile("Data\\Test\\Parquet\\Temp").registerTempTable("temp")
    //not empty table,  schema is (Int,String)
    val testData = sqlContext.parquetFile("Data\\Test\\Parquet\\2")
    testData.foreach{x=>
      sqlContext.sql("INSERT INTO temp SELECT "+x(0)+" ,'"+x(1)+"'")
    }
    sqlContext.sql("select * from
temp").collect().map(x=>(x(0),x(1))).foreach(println)
  }

when I run the code above in local mode, it will not stop and do not have
error log. The lastest log is as follows:
14/12/30 11:07:44 WARN ParquetRecordReader: Can not initialize counter due
to context is not a instance of TaskInputOutputContext, but is
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
14/12/30 11:07:44 INFO InternalParquetRecordReader: RecordReader initialized
will read a total of 200 records.
14/12/30 11:07:44 INFO InternalParquetRecordReader: at row 0. reading next
block
14/12/30 11:07:44 INFO CodecPool: Got brand-new decompressor [.gz]
14/12/30 11:07:44 INFO InternalParquetRecordReader: block read in memory in
20 ms. row count = 200
14/12/30 11:07:45 INFO SparkContext: Starting job: runJob at
ParquetTableOperations.scala:325
14/12/30 11:07:45 INFO DAGScheduler: Got job 1 (runJob at
ParquetTableOperations.scala:325) with 1 output partitions
(allowLocal=false)
14/12/30 11:07:45 INFO DAGScheduler: Final stage: Stage 1(runJob at
ParquetTableOperations.scala:325)
14/12/30 11:07:45 INFO DAGScheduler: Parents of final stage: List()
14/12/30 11:07:45 INFO DAGScheduler: Missing parents: List()
14/12/30 11:07:45 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[6]
at mapPartitions at basicOperators.scala:43), which has no missing parents
14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(53328) called with
curMem=239241, maxMem=1013836677
14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 52.1 KB, free 966.6 MB)
14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(31730) called with
curMem=292569, maxMem=1013836677
14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 31.0 KB, free 966.6 MB)
14/12/30 11:07:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on localhost:52533 (size: 31.0 KB, free: 966.8 MB)
14/12/30 11:07:45 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/12/30 11:07:45 INFO SparkContext: Created broadcast 2 from broadcast at
DAGScheduler.scala:838
14/12/30 11:07:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1
(MapPartitionsRDD[6] at mapPartitions at basicOperators.scala:43)
14/12/30 11:07:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

Can anyone give me a hand?

Thanks
evil



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/A-question-about-using-insert-into-in-rdd-foreach-in-spark-1-2-tp9959.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: A question about using insert into in rdd foreach in spark 1.2

Posted by Michael Armbrust <mi...@databricks.com>.
-dev +user

In general you cannot create new RDDs inside closures that run on the
executors (which is what sql inside of a foreach is doing).

I think what you want here is something like:

sqlContext.parquetFile("Data\\Test\\Parquet\\2").registerTempTable("temp2")
sql("SELECT col1, col2 FROM temp2").insertInto("temp")



On Mon, Dec 29, 2014 at 7:29 PM, evil <qi...@gmail.com> wrote:

> Hi All,
> I have a  problem when I try to use insert into in loop, and this is my
> code
> def main(args: Array[String]) {
>     //This is an empty table, schema is (Int,String)
>
>
> sqlContext.parquetFile("Data\\Test\\Parquet\\Temp").registerTempTable("temp")
>     //not empty table,  schema is (Int,String)
>     val testData = sqlContext.parquetFile("Data\\Test\\Parquet\\2")
>     testData.foreach{x=>
>       sqlContext.sql("INSERT INTO temp SELECT "+x(0)+" ,'"+x(1)+"'")
>     }
>     sqlContext.sql("select * from
> temp").collect().map(x=>(x(0),x(1))).foreach(println)
>   }
>
> when I run the code above in local mode, it will not stop and do not have
> error log. The lastest log is as follows:
> 14/12/30 11:07:44 WARN ParquetRecordReader: Can not initialize counter due
> to context is not a instance of TaskInputOutputContext, but is
> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
> 14/12/30 11:07:44 INFO InternalParquetRecordReader: RecordReader
> initialized
> will read a total of 200 records.
> 14/12/30 11:07:44 INFO InternalParquetRecordReader: at row 0. reading next
> block
> 14/12/30 11:07:44 INFO CodecPool: Got brand-new decompressor [.gz]
> 14/12/30 11:07:44 INFO InternalParquetRecordReader: block read in memory in
> 20 ms. row count = 200
> 14/12/30 11:07:45 INFO SparkContext: Starting job: runJob at
> ParquetTableOperations.scala:325
> 14/12/30 11:07:45 INFO DAGScheduler: Got job 1 (runJob at
> ParquetTableOperations.scala:325) with 1 output partitions
> (allowLocal=false)
> 14/12/30 11:07:45 INFO DAGScheduler: Final stage: Stage 1(runJob at
> ParquetTableOperations.scala:325)
> 14/12/30 11:07:45 INFO DAGScheduler: Parents of final stage: List()
> 14/12/30 11:07:45 INFO DAGScheduler: Missing parents: List()
> 14/12/30 11:07:45 INFO DAGScheduler: Submitting Stage 1
> (MapPartitionsRDD[6]
> at mapPartitions at basicOperators.scala:43), which has no missing parents
> 14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(53328) called with
> curMem=239241, maxMem=1013836677
> 14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2 stored as values in
> memory (estimated size 52.1 KB, free 966.6 MB)
> 14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(31730) called with
> curMem=292569, maxMem=1013836677
> 14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2_piece0 stored as
> bytes
> in memory (estimated size 31.0 KB, free 966.6 MB)
> 14/12/30 11:07:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
> on localhost:52533 (size: 31.0 KB, free: 966.8 MB)
> 14/12/30 11:07:45 INFO BlockManagerMaster: Updated info of block
> broadcast_2_piece0
> 14/12/30 11:07:45 INFO SparkContext: Created broadcast 2 from broadcast at
> DAGScheduler.scala:838
> 14/12/30 11:07:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage
> 1
> (MapPartitionsRDD[6] at mapPartitions at basicOperators.scala:43)
> 14/12/30 11:07:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
>
> Can anyone give me a hand?
>
> Thanks
> evil
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/A-question-about-using-insert-into-in-rdd-foreach-in-spark-1-2-tp9959.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>