You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/03/30 11:33:32 UTC

spark git commit: [SPARK-14114][SQL] implement buildReader for text data source

Repository: spark
Updated Branches:
  refs/heads/master 7320f9bd1 -> 816f359cf


[SPARK-14114][SQL] implement buildReader for text data source

## What changes were proposed in this pull request?

This PR implements buildReader for text data source and enable it in the new data source code path.

## How was this patch tested?

Existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #11934 from cloud-fan/text.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/816f359c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/816f359c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/816f359c

Branch: refs/heads/master
Commit: 816f359cf043ef719a0bc7df0506a3a830fff70d
Parents: 7320f9b
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Mar 30 17:32:53 2016 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Wed Mar 30 17:32:53 2016 +0800

----------------------------------------------------------------------
 .../datasources/FileSourceStrategy.scala        |  3 ++-
 .../datasources/text/DefaultSource.scala        | 28 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/816f359c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 20fda95..4448796 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,7 +59,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       if (files.fileFormat.toString == "TestFileFormat" ||
          files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
          files.fileFormat.toString == "ORC" ||
-         files.fileFormat.isInstanceOf[json.DefaultSource]) &&
+         files.fileFormat.isInstanceOf[json.DefaultSource] ||
+         files.fileFormat.isInstanceOf[text.DefaultSource]) &&
          files.sqlContext.conf.useFileScan =>
       // Filters on this relation fall into four categories based on where we can use them to avoid
       // reading unneeded data:

http://git-wip-us.apache.org/repos/asf/spark/blob/816f359c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 5cfc9e9..d6ab5fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
@@ -30,7 +31,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.datasources.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -125,6 +126,31 @@ class DefaultSource extends FileFormat with DataSourceRegister {
           }
         }
   }
+
+  override def buildReader(
+      sqlContext: SQLContext,
+      partitionSchema: StructType,
+      dataSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val broadcastedConf =
+      sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+    file => {
+      val unsafeRow = new UnsafeRow(1)
+      val bufferHolder = new BufferHolder(unsafeRow)
+      val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1)
+
+      new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+        // Writes to an UnsafeRow directly
+        bufferHolder.reset()
+        unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+        unsafeRow.setTotalSize(bufferHolder.totalSize())
+        unsafeRow
+      }
+    }
+  }
 }
 
 class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org