You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/05 00:08:51 UTC

spark git commit: [SPARK-5426][SQL] Add SparkSQL Java API helper methods.

Repository: spark
Updated Branches:
  refs/heads/master b90dd3979 -> 424cb699e


[SPARK-5426][SQL] Add SparkSQL Java API helper methods.

Right now the PR adds few helper methods for java apis. But the issue was opened mainly to get rid of transformations in java api like `.rdd` and `.toJavaRDD` while working with `SQLContext` or `HiveContext`.

Author: kul <ku...@gmail.com>

Closes #4243 from kul/master and squashes the following commits:

2390fba [kul] [SPARK-5426][SQL] Add SparkSQL Java API helper methods.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/424cb699
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/424cb699
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/424cb699

Branch: refs/heads/master
Commit: 424cb699ee9b091eb23b86dc018a86e377ad309f
Parents: b90dd39
Author: kul <ku...@gmail.com>
Authored: Wed Feb 4 15:08:37 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Feb 4 15:08:37 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala | 17 +++++++
 .../sql/api/java/JavaApplySchemaSuite.java      | 48 ++++++++++++++++++--
 2 files changed, 62 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/424cb699/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2697e78..1661282 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -221,6 +221,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
     DataFrame(this, logicalPlan)
   }
 
+  @DeveloperApi
+  def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+    applySchema(rowRDD.rdd, schema);
+  }
+
   /**
    * Applies a schema to an RDD of Java Beans.
    *
@@ -305,6 +310,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
 
+  def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
+
   /**
    * :: Experimental ::
    * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
@@ -323,6 +330,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
     applySchema(rowRDD, appliedSchema)
   }
 
+  @Experimental
+  def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+    jsonRDD(json.rdd, schema)
+  }
+
   /**
    * :: Experimental ::
    */
@@ -337,6 +349,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   @Experimental
+  def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+    jsonRDD(json.rdd, samplingRatio);
+  }
+
+  @Experimental
   def load(path: String): DataFrame = {
     val dataSourceName = conf.defaultDataSourceName
     load(dataSourceName, ("path", path))

http://git-wip-us.apache.org/repos/asf/spark/blob/424cb699/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
index badd00d..8510bac 100644
--- a/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/JavaApplySchemaSuite.java
@@ -98,7 +98,7 @@ public class JavaApplySchemaSuite implements Serializable {
     fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
     StructType schema = DataTypes.createStructType(fields);
 
-    DataFrame df = javaSqlCtx.applySchema(rowRDD.rdd(), schema);
+    DataFrame df = javaSqlCtx.applySchema(rowRDD, schema);
     df.registerTempTable("people");
     Row[] actual = javaSqlCtx.sql("SELECT * FROM people").collect();
 
@@ -109,6 +109,48 @@ public class JavaApplySchemaSuite implements Serializable {
     Assert.assertEquals(expected, Arrays.asList(actual));
   }
 
+
+
+  @Test
+  public void dataFrameRDDOperations() {
+    List<Person> personList = new ArrayList<Person>(2);
+    Person person1 = new Person();
+    person1.setName("Michael");
+    person1.setAge(29);
+    personList.add(person1);
+    Person person2 = new Person();
+    person2.setName("Yin");
+    person2.setAge(28);
+    personList.add(person2);
+
+    JavaRDD<Row> rowRDD = javaCtx.parallelize(personList).map(
+            new Function<Person, Row>() {
+              public Row call(Person person) throws Exception {
+                return RowFactory.create(person.getName(), person.getAge());
+              }
+            });
+
+    List<StructField> fields = new ArrayList<StructField>(2);
+    fields.add(DataTypes.createStructField("name", DataTypes.StringType, false));
+    fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
+    StructType schema = DataTypes.createStructType(fields);
+
+    DataFrame df = javaSqlCtx.applySchema(rowRDD, schema);
+    df.registerTempTable("people");
+    List<String> actual = javaSqlCtx.sql("SELECT * FROM people").toJavaRDD().map(new Function<Row, String>() {
+
+      public String call(Row row) {
+        return row.getString(0) + "_" + row.get(1).toString();
+      }
+    }).collect();
+
+    List<String> expected = new ArrayList<String>(2);
+    expected.add("Michael_29");
+    expected.add("Yin_28");
+
+    Assert.assertEquals(expected, actual);
+  }
+
   @Test
   public void applySchemaToJSON() {
     JavaRDD<String> jsonRDD = javaCtx.parallelize(Arrays.asList(
@@ -147,14 +189,14 @@ public class JavaApplySchemaSuite implements Serializable {
         null,
         "this is another simple string."));
 
-    DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD.rdd());
+    DataFrame df1 = javaSqlCtx.jsonRDD(jsonRDD);
     StructType actualSchema1 = df1.schema();
     Assert.assertEquals(expectedSchema, actualSchema1);
     df1.registerTempTable("jsonTable1");
     List<Row> actual1 = javaSqlCtx.sql("select * from jsonTable1").collectAsList();
     Assert.assertEquals(expectedResult, actual1);
 
-    DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD.rdd(), expectedSchema);
+    DataFrame df2 = javaSqlCtx.jsonRDD(jsonRDD, expectedSchema);
     StructType actualSchema2 = df2.schema();
     Assert.assertEquals(expectedSchema, actualSchema2);
     df2.registerTempTable("jsonTable2");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org