You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jean Georges Perrin <jg...@jgp.net> on 2017/03/22 19:27:59 UTC

Custom Spark data source in Java

Hi,

I am trying to build a custom file data source for Spark, in Java. I have found numerous examples in Scala (including the CSV and XML data sources from Databricks), but I cannot bring Scala in this project. We also already have the parser itself written in Java, I just need to build the "glue" between the parser and Spark.

This is how I'd like to call it:

    String filename = "src/test/resources/simple.x";

    SparkSession spark = SparkSession.builder().appName("X-parse").master("local").getOrCreate();

    Dataset<Row> df = spark.read().format("x.RandomDataSource")
            .option("metadataTag", "schema") // hint to find schema
            .option("dataTag", "data") // hint to find data
            .load(filename); // local file
So far, I tried is implement x.RandomDataSource:

	• Based on FileFormat, which makes the most sense, but I do not have a clue on how to build buildReader()...
	• Based on RelationProvider, but same here...

It seems that in both case, the call is made to the right class, but I get into NPE because I do not provide much. Any hint or example would be greatly appreciated!

Thanks

jg

Re: Custom Spark data source in Java

Posted by Jörn Franke <jo...@gmail.com>.
ok, I understand. For 1) As a minimum you need to implement inferSchema and
buildReader. InferSchema must return the Schema of a row. For example, if
it contains one column of type String it returns:
StructType(collection.immutable.Seq(StructField("column1", StringType,
true))

buildreader: here you find an article how to create a function1 in Java:
http://www.codecommit.com/blog/java/interop-between-java-and-scala

It returns basically a function that takes a file as input and returns the
rows as output (Iterator).  Btw. for better readability i would recommend
Java8 Lambda functions. instead of Function 1 etc. this would look also
much more similar to Scala, but is fully Java compliant.

you can find an example in Scala here: https://github.com/ZuInnoTe/
spark-hadoopoffice-ds/blob/master/src/main/scala/org/
zuinnote/spark/office/excel/DefaultSource.scala
It is a little bit more complex, because it returns for each row an array
that contains element of a  complex type (Excel cell)

For 2) it is in fact similar. You have to create a class that inherits from
Baserelation and implements TableScan. There you need to implement schema
and buildScan. Then you return simply ;-) an object of this class.

Here another example in Scala: https://github.com/ZuInnoTe/
spark-hadoopcryptoledger-ds/blob/master/src/main/scala/
org/zuinnote/spark/bitcoin/block/BitcoinBlockRelation.scala

Sorry it is again a little bit more complex, because it returns Bitcoin
blocks from the blockchain...

I hope it helps as a start. Let me know if you have more questions.

On Wed, Mar 22, 2017 at 9:35 PM, Jean Georges Perrin <jg...@jgp.net> wrote:

> Thanks Jörn,
>
> I tried to super simplify my project so I can focus on the plumbing and I
> will add the existing code & library later. So, as of now, the project will
> not have a lot of meaning but will allow me to understand the job.
>
> my call is:
>
> String filename = "src/test/resources/simple.json";
> SparkSession spark = SparkSession.builder().appName("X-parse").master("
> local").getOrCreate();
> Dataset<Row> df = spark.read().format("x.CharCounterDataSource")
> .option("char", "a") // count the number of 'a'
> .load(filename); // local file (line 40 in the stacks below)
> df.show();
>
> Ideally, this should display something like:
>
> +--+
> | a|
> +--+
> |45|
> +--+
>
> Things gets trickier when I try to work on x.CharCounterDataSource:
>
> I looked at 2 ways to do it:
>
> 1) one based on FileFormat:
>
> public class CharCounterDataSource implements FileFormat {
>
> @Override
> public Function1<PartitionedFile, Iterator<InternalRow>>
> buildReader(SparkSession arg0, StructType arg1,
> StructType arg2, StructType arg3, Seq<Filter> arg4, Map<String, String>
> arg5, Configuration arg6) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public Function1<PartitionedFile, Iterator<InternalRow>>
> buildReaderWithPartitionValues(SparkSession arg0,
> StructType arg1, StructType arg2, StructType arg3, Seq<Filter> arg4,
> Map<String, String> arg5,
> Configuration arg6) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public Option<StructType> inferSchema(SparkSession arg0, Map<String,
> String> arg1, Seq<FileStatus> arg2) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public boolean isSplitable(SparkSession arg0, Map<String, String> arg1,
> Path arg2) {
> // TODO Auto-generated method stub
> return false;
> }
>
> @Override
> public OutputWriterFactory prepareWrite(SparkSession arg0, Job arg1,
> Map<String, String> arg2, StructType arg3) {
> // TODO Auto-generated method stub
> return null;
> }
>
> @Override
> public boolean supportBatch(SparkSession arg0, StructType arg1) {
> // TODO Auto-generated method stub
> return false;
> }
> }
>
> I know it is an empty class (generated by Eclipse) and I am not expecting
> much out of it.
>
> Running it says:
>
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$
> sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(
> DataSource.scala:188)
> at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(
> DataSource.scala:387)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
> at x.spark.datasource.counter.CharCounterDataSourceTest.test(
> CharCounterDataSourceTest.java:40)
>
> Nothing surprising...
>
> 2) One based on RelationProvider:
>
> public class CharCounterDataSource implements RelationProvider {
>
> @Override
> public BaseRelation createRelation(SQLContext arg0, Map<String, String>
> arg1) {
> // TODO Auto-generated method stub
> return null;
> }
>
> }
>
> which fails too...
>
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(
> LogicalRelation.scala:40)
> at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(
> SparkSession.scala:389)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
> at x.CharCounterDataSourceTest.test(CharCounterDataSourceTest.java:40)
>
>
> Don't get me wrong - I understand it fails - but what I need is "just one
> hint" to continue building the glue ;-)...
>
> (Un)fortunately, we cannot use Scala...
>
> jg
>
> On Mar 22, 2017, at 4:00 PM, Jörn Franke <jo...@gmail.com> wrote:
>
> I think you can develop a Spark data source in Java, but you are right
> most use for the glue Spark even if they have a Java library (this is what
> I did for the project I open sourced). Coming back to your question, it is
> a little bit difficult to assess the exact issue without the code.
> You could also try to first have a very simple Scala data source that
> works and then translate it to Java and do the test there. You could then
> also post the code here without disclosing confidential stuff.
> Or you try directly in Java a data source that returns always a row with
> one column containing a String. I fear in any case you need to import some
> Scala classes in Java and/or have some wrappers in Scala.
> If you use fileformat that you need at least spark 2.0.
>
> On 22 Mar 2017, at 20:27, Jean Georges Perrin <jg...@jgp.net> wrote:
>
>
> Hi,
>
> I am trying to build a custom file data source for Spark, in Java. I have
> found numerous examples in Scala (including the CSV and XML data sources
> from Databricks), but I cannot bring Scala in this project. We also already
> have the parser itself written in Java, I just need to build the "glue"
> between the parser and Spark.
>
> This is how I'd like to call it:
>
>     String filename = "src/test/resources/simple.x";
>
>     SparkSession spark = SparkSession.builder().appName("X-parse").master("local").getOrCreate();
>
>     Dataset<Row> df = spark.read().format("x.RandomDataSource")
>             .option("metadataTag", "schema") // hint to find schema
>             .option("dataTag", "data") // hint to find data
>             .load(filename); // local file
>
> So far, I tried is implement x.RandomDataSource:
>
> • Based on FileFormat, which makes the most sense, but I do not have a
> clue on how to build buildReader()...
> • Based on RelationProvider, but same here...
>
> It seems that in both case, the call is made to the right class, but I get
> into NPE because I do not provide much. Any hint or example would be
> greatly appreciated!
>
> Thanks
>
> jg
>
>
>

Re: Custom Spark data source in Java

Posted by Jean Georges Perrin <jg...@jgp.net>.
Thanks Jörn,

I tried to super simplify my project so I can focus on the plumbing and I will add the existing code & library later. So, as of now, the project will not have a lot of meaning but will allow me to understand the job.

my call is:

String filename = "src/test/resources/simple.json";
SparkSession spark = SparkSession.builder().appName("X-parse").master("local").getOrCreate();
Dataset<Row> df = spark.read().format("x.CharCounterDataSource")
	.option("char", "a") // count the number of 'a'
	.load(filename); // local file (line 40 in the stacks below)
df.show();

Ideally, this should display something like:

+--+
| a|
+--+
|45|
+--+

Things gets trickier when I try to work on x.CharCounterDataSource:

I looked at 2 ways to do it:

1) one based on FileFormat:

public class CharCounterDataSource implements FileFormat {

	@Override
	public Function1<PartitionedFile, Iterator<InternalRow>> buildReader(SparkSession arg0, StructType arg1,
			StructType arg2, StructType arg3, Seq<Filter> arg4, Map<String, String> arg5, Configuration arg6) {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public Function1<PartitionedFile, Iterator<InternalRow>> buildReaderWithPartitionValues(SparkSession arg0,
			StructType arg1, StructType arg2, StructType arg3, Seq<Filter> arg4, Map<String, String> arg5,
			Configuration arg6) {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public Option<StructType> inferSchema(SparkSession arg0, Map<String, String> arg1, Seq<FileStatus> arg2) {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public boolean isSplitable(SparkSession arg0, Map<String, String> arg1, Path arg2) {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public OutputWriterFactory prepareWrite(SparkSession arg0, Job arg1, Map<String, String> arg2, StructType arg3) {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public boolean supportBatch(SparkSession arg0, StructType arg1) {
		// TODO Auto-generated method stub
		return false;
	}
}

I know it is an empty class (generated by Eclipse) and I am not expecting much out of it.

Running it says:

java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
	at x.spark.datasource.counter.CharCounterDataSourceTest.test(CharCounterDataSourceTest.java:40)

Nothing surprising...

2) One based on RelationProvider:

public class CharCounterDataSource implements RelationProvider {

	@Override
	public BaseRelation createRelation(SQLContext arg0, Map<String, String> arg1) {
		// TODO Auto-generated method stub
		return null;
	}

}

which fails too...

java.lang.NullPointerException
	at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:40)
	at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:389)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
	at x.CharCounterDataSourceTest.test(CharCounterDataSourceTest.java:40)


Don't get me wrong - I understand it fails - but what I need is "just one hint" to continue building the glue ;-)...

(Un)fortunately, we cannot use Scala...

jg

> On Mar 22, 2017, at 4:00 PM, Jörn Franke <jo...@gmail.com> wrote:
> 
> I think you can develop a Spark data source in Java, but you are right most use for the glue Spark even if they have a Java library (this is what I did for the project I open sourced). Coming back to your question, it is a little bit difficult to assess the exact issue without the code.
> You could also try to first have a very simple Scala data source that works and then translate it to Java and do the test there. You could then also post the code here without disclosing confidential stuff.
> Or you try directly in Java a data source that returns always a row with one column containing a String. I fear in any case you need to import some Scala classes in Java and/or have some wrappers in Scala.
> If you use fileformat that you need at least spark 2.0. 
> 
> On 22 Mar 2017, at 20:27, Jean Georges Perrin <jgp@jgp.net <ma...@jgp.net>> wrote:
> 
>> 
>> Hi,
>> 
>> I am trying to build a custom file data source for Spark, in Java. I have found numerous examples in Scala (including the CSV and XML data sources from Databricks), but I cannot bring Scala in this project. We also already have the parser itself written in Java, I just need to build the "glue" between the parser and Spark.
>> 
>> This is how I'd like to call it:
>> 
>>     String filename = "src/test/resources/simple.x";
>> 
>>     SparkSession spark = SparkSession.builder().appName("X-parse").master("local").getOrCreate();
>> 
>>     Dataset<Row> df = spark.read().format("x.RandomDataSource")
>>             .option("metadataTag", "schema") // hint to find schema
>>             .option("dataTag", "data") // hint to find data
>>             .load(filename); // local file
>> So far, I tried is implement x.RandomDataSource:
>> 
>> 	• Based on FileFormat, which makes the most sense, but I do not have a clue on how to build buildReader()...
>> 	• Based on RelationProvider, but same here...
>> 
>> It seems that in both case, the call is made to the right class, but I get into NPE because I do not provide much. Any hint or example would be greatly appreciated!
>> 
>> Thanks
>> 
>> jg


Re: Custom Spark data source in Java

Posted by Jörn Franke <jo...@gmail.com>.
I think you can develop a Spark data source in Java, but you are right most use for the glue Spark even if they have a Java library (this is what I did for the project I open sourced). Coming back to your question, it is a little bit difficult to assess the exact issue without the code.
You could also try to first have a very simple Scala data source that works and then translate it to Java and do the test there. You could then also post the code here without disclosing confidential stuff.
Or you try directly in Java a data source that returns always a row with one column containing a String. I fear in any case you need to import some Scala classes in Java and/or have some wrappers in Scala.
If you use fileformat that you need at least spark 2.0. 

> On 22 Mar 2017, at 20:27, Jean Georges Perrin <jg...@jgp.net> wrote:
> 
> 
> Hi,
> 
> I am trying to build a custom file data source for Spark, in Java. I have found numerous examples in Scala (including the CSV and XML data sources from Databricks), but I cannot bring Scala in this project. We also already have the parser itself written in Java, I just need to build the "glue" between the parser and Spark.
> 
> This is how I'd like to call it:
> 
>     String filename = "src/test/resources/simple.x";
> 
>     SparkSession spark = SparkSession.builder().appName("X-parse").master("local").getOrCreate();
> 
>     Dataset<Row> df = spark.read().format("x.RandomDataSource")
>             .option("metadataTag", "schema") // hint to find schema
>             .option("dataTag", "data") // hint to find data
>             .load(filename); // local file
> So far, I tried is implement x.RandomDataSource:
> 
> 	• Based on FileFormat, which makes the most sense, but I do not have a clue on how to build buildReader()...
> 	• Based on RelationProvider, but same here...
> 
> It seems that in both case, the call is made to the right class, but I get into NPE because I do not provide much. Any hint or example would be greatly appreciated!
> 
> Thanks
> 
> jg