You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Brandon White <bw...@gmail.com> on 2015/07/10 20:55:51 UTC

Spark Streaming - Inserting into Tables

Why does this not work? Is insert into broken in 1.3.1? It does not throw
any errors, fail, or throw exceptions. It simply does not work.


val ssc = new StreamingContext(sc, Minutes(10))

val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")

dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
parquetFile.registerTempTable("rideaccepted")

currentStream.foreachRDD { rdd =>
  val df = sqlContext.jsonRDD(rdd)
  df.insertInto("rideaccepted")
}

ssc.start()


Or this?

val ssc = new StreamingContext(sc, Minutes(10))
val currentStream = ssc.textFileStream("s3://textFileDirectory")
val day = sqlContext.jsonFile("s3://textFileDirectory")
day.registerTempTable("rideaccepted")


currentStream.foreachRDD { rdd =>
  val df = sqlContext.jsonRDD(rdd)
  df.registerTempTable("tmp_rideaccepted")
  sqlContext.sql("insert into table rideaccepted select * from
tmp_rideaccepted")
}

ssc.start()


or this?

val ssc = new StreamingContext(sc, Minutes(10))

val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")

dayBefore..registerTempTable("rideaccepted")

currentStream.foreachRDD { rdd =>
  val df = sqlContext.jsonRDD(rdd)
  df.insertInto("rideaccepted")
}

ssc.start()

Re: Spark Streaming - Inserting into Tables

Posted by Tathagata Das <td...@databricks.com>.
Why is .remember not ideal?


On Sun, Jul 12, 2015 at 7:22 PM, Brandon White <bw...@gmail.com>
wrote:

> Hi Yin,
>
> Yes there were no new rows. I fixed it by doing a .remember on the
> context. Obviously, this is not ideal.
>
> On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yh...@databricks.com> wrote:
>
>> Hi Brandon,
>>
>> Can you explain what did you mean by "It simply does not work"? You did
>> not see new data files?
>>
>> Thanks,
>>
>> Yin
>>
>> On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bw...@gmail.com>
>> wrote:
>>
>>> Why does this not work? Is insert into broken in 1.3.1? It does not
>>> throw any errors, fail, or throw exceptions. It simply does not work.
>>>
>>>
>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>
>>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>>
>>> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
>>> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
>>> parquetFile.registerTempTable("rideaccepted")
>>>
>>> currentStream.foreachRDD { rdd =>
>>>   val df = sqlContext.jsonRDD(rdd)
>>>   df.insertInto("rideaccepted")
>>> }
>>>
>>> ssc.start()
>>>
>>>
>>> Or this?
>>>
>>> val ssc = new StreamingContext(sc, Minutes(10))
>>> val currentStream = ssc.textFileStream("s3://textFileDirectory")
>>> val day = sqlContext.jsonFile("s3://textFileDirectory")
>>> day.registerTempTable("rideaccepted")
>>>
>>>
>>> currentStream.foreachRDD { rdd =>
>>>   val df = sqlContext.jsonRDD(rdd)
>>>   df.registerTempTable("tmp_rideaccepted")
>>>   sqlContext.sql("insert into table rideaccepted select * from tmp_rideaccepted")
>>> }
>>>
>>> ssc.start()
>>>
>>>
>>> or this?
>>>
>>> val ssc = new StreamingContext(sc, Minutes(10))
>>>
>>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>>
>>> dayBefore..registerTempTable("rideaccepted")
>>>
>>> currentStream.foreachRDD { rdd =>
>>>   val df = sqlContext.jsonRDD(rdd)
>>>   df.insertInto("rideaccepted")
>>> }
>>>
>>> ssc.start()
>>>
>>>
>>
>

Re: Spark Streaming - Inserting into Tables

Posted by Brandon White <bw...@gmail.com>.
Hi Yin,

Yes there were no new rows. I fixed it by doing a .remember on the context.
Obviously, this is not ideal.

On Sun, Jul 12, 2015 at 6:31 PM, Yin Huai <yh...@databricks.com> wrote:

> Hi Brandon,
>
> Can you explain what did you mean by "It simply does not work"? You did
> not see new data files?
>
> Thanks,
>
> Yin
>
> On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bw...@gmail.com>
> wrote:
>
>> Why does this not work? Is insert into broken in 1.3.1? It does not throw
>> any errors, fail, or throw exceptions. It simply does not work.
>>
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>
>> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
>> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
>> parquetFile.registerTempTable("rideaccepted")
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.insertInto("rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>> Or this?
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>> val currentStream = ssc.textFileStream("s3://textFileDirectory")
>> val day = sqlContext.jsonFile("s3://textFileDirectory")
>> day.registerTempTable("rideaccepted")
>>
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.registerTempTable("tmp_rideaccepted")
>>   sqlContext.sql("insert into table rideaccepted select * from tmp_rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>> or this?
>>
>> val ssc = new StreamingContext(sc, Minutes(10))
>>
>> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
>> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>>
>> dayBefore..registerTempTable("rideaccepted")
>>
>> currentStream.foreachRDD { rdd =>
>>   val df = sqlContext.jsonRDD(rdd)
>>   df.insertInto("rideaccepted")
>> }
>>
>> ssc.start()
>>
>>
>

Re: Spark Streaming - Inserting into Tables

Posted by Yin Huai <yh...@databricks.com>.
Hi Brandon,

Can you explain what did you mean by "It simply does not work"? You did not
see new data files?

Thanks,

Yin

On Fri, Jul 10, 2015 at 11:55 AM, Brandon White <bw...@gmail.com>
wrote:

> Why does this not work? Is insert into broken in 1.3.1? It does not throw
> any errors, fail, or throw exceptions. It simply does not work.
>
>
> val ssc = new StreamingContext(sc, Minutes(10))
>
> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>
> dayBefore.saveAsParquetFile("/tmp/cache/dayBefore.parquet")
> val parquetFile = sqlContext.parquetFile("/tmp/cache/dayBefore.parquet")
> parquetFile.registerTempTable("rideaccepted")
>
> currentStream.foreachRDD { rdd =>
>   val df = sqlContext.jsonRDD(rdd)
>   df.insertInto("rideaccepted")
> }
>
> ssc.start()
>
>
> Or this?
>
> val ssc = new StreamingContext(sc, Minutes(10))
> val currentStream = ssc.textFileStream("s3://textFileDirectory")
> val day = sqlContext.jsonFile("s3://textFileDirectory")
> day.registerTempTable("rideaccepted")
>
>
> currentStream.foreachRDD { rdd =>
>   val df = sqlContext.jsonRDD(rdd)
>   df.registerTempTable("tmp_rideaccepted")
>   sqlContext.sql("insert into table rideaccepted select * from tmp_rideaccepted")
> }
>
> ssc.start()
>
>
> or this?
>
> val ssc = new StreamingContext(sc, Minutes(10))
>
> val currentStream = ssc.textFileStream(s"s3://textFileDirectory/")
> val dayBefore = sqlContext.jsonFile(s"s3://textFileDirectory/")
>
> dayBefore..registerTempTable("rideaccepted")
>
> currentStream.foreachRDD { rdd =>
>   val df = sqlContext.jsonRDD(rdd)
>   df.insertInto("rideaccepted")
> }
>
> ssc.start()
>
>