You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "宿荣全 (JIRA)" <ji...@apache.org> on 2014/11/04 06:02:33 UTC
[jira] [Updated] (SPARK-3954) Optimization to FileInputDStream
[ https://issues.apache.org/jira/browse/SPARK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
宿荣全 updated SPARK-3954:
-----------------------
Description:
about convert files to RDDS there are 3 loops with files sequence in spark source.
loops files sequence:
1.files.map(...)
2.files.zip(fileRDDs)
3.files-size.foreach
It's will very time consuming when lots of files.So I do the following correction:
3 loops with files sequence => only one loop
spark source code:
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
files.zip(fileRDDs).foreach { case (file, rdd) => {
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
}}
new UnionRDD(context.sparkContext, fileRDDs)
}
// -----------------------------------------------------------------------------------
modified code:
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
rdd
}
new UnionRDD(context.sparkContext, fileRDDs)
}
was:
about convert files to RDDS there are 3 loops with files sequence in spark source.
loops files sequence:
1、files.map(...)
2、files.zip(fileRDDs)
3、files-size.foreach
It's will very time consuming when lots of files.So I do the following correction:
3 loops with files sequence => only one loop
spark source code:
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
files.zip(fileRDDs).foreach { case (file, rdd) => {
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
}}
new UnionRDD(context.sparkContext, fileRDDs)
}
// -----------------------------------------------------------------------------------
modified code:
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
"Refer to the streaming programming guide for more details.")
}
rdd
}
new UnionRDD(context.sparkContext, fileRDDs)
}
> Optimization to FileInputDStream
> --------------------------------
>
> Key: SPARK-3954
> URL: https://issues.apache.org/jira/browse/SPARK-3954
> Project: Spark
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.0.0, 1.1.0
> Reporter: 宿荣全
>
> about convert files to RDDS there are 3 loops with files sequence in spark source.
> loops files sequence:
> 1.files.map(...)
> 2.files.zip(fileRDDs)
> 3.files-size.foreach
> It's will very time consuming when lots of files.So I do the following correction:
> 3 loops with files sequence => only one loop
> spark source code:
> private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
> val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
> files.zip(fileRDDs).foreach { case (file, rdd) => {
> if (rdd.partitions.size == 0) {
> logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
> "files that have been \"moved\" to the directory assigned to the file stream. " +
> "Refer to the streaming programming guide for more details.")
> }
> }}
> new UnionRDD(context.sparkContext, fileRDDs)
> }
> // -----------------------------------------------------------------------------------
> modified code:
> private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
> val fileRDDs = for (file <- files; rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)) yield {
> if (rdd.partitions.size == 0) {
> logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
> "files that have been \"moved\" to the directory assigned to the file stream. " +
> "Refer to the streaming programming guide for more details.")
> }
> rdd
> }
> new UnionRDD(context.sparkContext, fileRDDs)
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org