You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/30 09:20:04 UTC
[2/3] [SPARK-2179][SQL] Public API for DataTypes and Schema
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java
new file mode 100644
index 0000000..9250491
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing java.math.BigDecimal values.
+ *
+ * {@code DecimalType} is represented by the singleton object {@link DataType#DecimalType}.
+ */
+public class DecimalType extends DataType {
+ protected DecimalType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java
new file mode 100644
index 0000000..3e86917
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing double and Double values.
+ *
+ * {@code DoubleType} is represented by the singleton object {@link DataType#DoubleType}.
+ */
+public class DoubleType extends DataType {
+ protected DoubleType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java
new file mode 100644
index 0000000..fa860d4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing float and Float values.
+ *
+ * {@code FloatType} is represented by the singleton object {@link DataType#FloatType}.
+ */
+public class FloatType extends DataType {
+ protected FloatType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java
new file mode 100644
index 0000000..bd973ec
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing int and Integer values.
+ *
+ * {@code IntegerType} is represented by the singleton object {@link DataType#IntegerType}.
+ */
+public class IntegerType extends DataType {
+ protected IntegerType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java
new file mode 100644
index 0000000..e002333
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing long and Long values.
+ *
+ * {@code LongType} is represented by the singleton object {@link DataType#LongType}.
+ */
+public class LongType extends DataType {
+ protected LongType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java
new file mode 100644
index 0000000..94936e2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing Maps. A MapType object comprises two fields,
+ * {@code DataType keyType}, {@code DataType valueType}, and {@code boolean valueContainsNull}.
+ * The field of {@code keyType} is used to specify the type of keys in the map.
+ * The field of {@code valueType} is used to specify the type of values in the map.
+ * The field of {@code valueContainsNull} is used to specify if map values have
+ * {@code null} values.
+ * For values of a MapType column, keys are not allowed to have {@code null} values.
+ *
+ * To create a {@link MapType},
+ * {@link org.apache.spark.sql.api.java.types.DataType#createMapType(DataType, DataType)} or
+ * {@link org.apache.spark.sql.api.java.types.DataType#createMapType(DataType, DataType, boolean)}
+ * should be used.
+ */
+public class MapType extends DataType {
+ private DataType keyType;
+ private DataType valueType;
+ private boolean valueContainsNull;
+
+ protected MapType(DataType keyType, DataType valueType, boolean valueContainsNull) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+ this.valueContainsNull = valueContainsNull;
+ }
+
+ public DataType getKeyType() {
+ return keyType;
+ }
+
+ public DataType getValueType() {
+ return valueType;
+ }
+
+ public boolean isValueContainsNull() {
+ return valueContainsNull;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MapType mapType = (MapType) o;
+
+ if (valueContainsNull != mapType.valueContainsNull) return false;
+ if (!keyType.equals(mapType.keyType)) return false;
+ if (!valueType.equals(mapType.valueType)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = keyType.hashCode();
+ result = 31 * result + valueType.hashCode();
+ result = 31 * result + (valueContainsNull ? 1 : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java
new file mode 100644
index 0000000..98f9507
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing short and Short values.
+ *
+ * {@code ShortType} is represented by the singleton object {@link DataType#ShortType}.
+ */
+public class ShortType extends DataType {
+ protected ShortType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java
new file mode 100644
index 0000000..b8e7dbe
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing String values.
+ *
+ * {@code StringType} is represented by the singleton object {@link DataType#StringType}.
+ */
+public class StringType extends DataType {
+ protected StringType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java
new file mode 100644
index 0000000..54e9c11
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * A StructField object represents a field in a StructType object.
+ * A StructField object comprises three fields, {@code String name}, {@code DataType dataType},
+ * and {@code boolean nullable}. The field of {@code name} is the name of a StructField.
+ * The field of {@code dataType} specifies the data type of a StructField.
+ * The field of {@code nullable} specifies if values of a StructField can contain {@code null}
+ * values.
+ *
+ * To create a {@link StructField},
+ * {@link org.apache.spark.sql.api.java.types.DataType#createStructField(String, DataType, boolean)}
+ * should be used.
+ */
+public class StructField {
+ private String name;
+ private DataType dataType;
+ private boolean nullable;
+
+ protected StructField(String name, DataType dataType, boolean nullable) {
+ this.name = name;
+ this.dataType = dataType;
+ this.nullable = nullable;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public boolean isNullable() {
+ return nullable;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ StructField that = (StructField) o;
+
+ if (nullable != that.nullable) return false;
+ if (!dataType.equals(that.dataType)) return false;
+ if (!name.equals(that.name)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name.hashCode();
+ result = 31 * result + dataType.hashCode();
+ result = 31 * result + (nullable ? 1 : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java
new file mode 100644
index 0000000..33a42f4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The data type representing Rows.
+ * A StructType object comprises an array of StructFields.
+ *
+ * To create an {@link StructType},
+ * {@link org.apache.spark.sql.api.java.types.DataType#createStructType(java.util.List)} or
+ * {@link org.apache.spark.sql.api.java.types.DataType#createStructType(StructField[])}
+ * should be used.
+ */
+public class StructType extends DataType {
+ private StructField[] fields;
+
+ protected StructType(StructField[] fields) {
+ this.fields = fields;
+ }
+
+ public StructField[] getFields() {
+ return fields;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ StructType that = (StructType) o;
+
+ if (!Arrays.equals(fields, that.fields)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(fields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java
new file mode 100644
index 0000000..6529577
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing java.sql.Timestamp values.
+ *
+ * {@code TimestampType} is represented by the singleton object {@link DataType#TimestampType}.
+ */
+public class TimestampType extends DataType {
+ protected TimestampType() {}
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java
new file mode 100644
index 0000000..f169ac6
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Allows users to get and create Spark SQL data types.
+ */
+package org.apache.spark.sql.api.java.types;
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e4b6810..8633875 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
@@ -89,6 +88,44 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
/**
+ * :: DeveloperApi ::
+ * Creates a [[SchemaRDD]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
+ * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
+ * the provided schema. Otherwise, there will be runtime exception.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ * val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+ *
+ * val schema =
+ * StructType(
+ * StructField("name", StringType, false) ::
+ * StructField("age", IntegerType, true) :: Nil)
+ *
+ * val people =
+ * sc.textFile("examples/src/main/resources/people.txt").map(
+ * _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
+ * val peopleSchemaRDD = sqlContext. applySchema(people, schema)
+ * peopleSchemaRDD.printSchema
+ * // root
+ * // |-- name: string (nullable = false)
+ * // |-- age: integer (nullable = true)
+ *
+ * peopleSchemaRDD.registerAsTable("people")
+ * sqlContext.sql("select name from people").collect.foreach(println)
+ * }}}
+ *
+ * @group userf
+ */
+ @DeveloperApi
+ def applySchema(rowRDD: RDD[Row], schema: StructType): SchemaRDD = {
+ // TODO: use MutableProjection when rowRDD is another SchemaRDD and the applied
+ // schema differs from the existing schema on any field data type.
+ val logicalPlan = SparkLogicalPlan(ExistingRdd(schema.toAttributes, rowRDD))(self)
+ new SchemaRDD(this, logicalPlan)
+ }
+
+ /**
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
*
* @group userf
@@ -106,6 +143,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
+ * Loads a JSON file (one object per line) and applies the given schema,
+ * returning the result as a [[SchemaRDD]].
+ *
+ * @group userf
+ */
+ @Experimental
+ def jsonFile(path: String, schema: StructType): SchemaRDD = {
+ val json = sparkContext.textFile(path)
+ jsonRDD(json, schema)
+ }
+
+ /**
+ * :: Experimental ::
*/
@Experimental
def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
@@ -124,10 +174,28 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
+ * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
+ * returning the result as a [[SchemaRDD]].
+ *
+ * @group userf
+ */
+ @Experimental
+ def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
+ val appliedSchema =
+ Option(schema).getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, 1.0)))
+ val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+ applySchema(rowRDD, appliedSchema)
+ }
+
+ /**
+ * :: Experimental ::
*/
@Experimental
- def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
- new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))
+ def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
+ val appliedSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))
+ val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+ applySchema(rowRDD, appliedSchema)
+ }
/**
* :: Experimental ::
@@ -345,70 +413,138 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Peek at the first row of the RDD and infer its schema.
- * TODO: consolidate this with the type system developed in SPARK-2060.
+ * It is only used by PySpark.
*/
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
import scala.collection.JavaConversions._
- def typeFor(obj: Any): DataType = obj match {
- case c: java.lang.String => StringType
- case c: java.lang.Integer => IntegerType
- case c: java.lang.Long => LongType
- case c: java.lang.Double => DoubleType
- case c: java.lang.Boolean => BooleanType
- case c: java.math.BigDecimal => DecimalType
- case c: java.sql.Timestamp => TimestampType
+
+ def typeOfComplexValue: PartialFunction[Any, DataType] = {
case c: java.util.Calendar => TimestampType
- case c: java.util.List[_] => ArrayType(typeFor(c.head))
+ case c: java.util.List[_] =>
+ ArrayType(typeOfObject(c.head))
case c: java.util.Map[_, _] =>
val (key, value) = c.head
- MapType(typeFor(key), typeFor(value))
+ MapType(typeOfObject(key), typeOfObject(value))
case c if c.getClass.isArray =>
val elem = c.asInstanceOf[Array[_]].head
- ArrayType(typeFor(elem))
+ ArrayType(typeOfObject(elem))
case c => throw new Exception(s"Object of type $c cannot be used")
}
+ def typeOfObject = ScalaReflection.typeOfObject orElse typeOfComplexValue
+
val firstRow = rdd.first()
- val schema = firstRow.map { case (fieldName, obj) =>
- AttributeReference(fieldName, typeFor(obj), true)()
+ val fields = firstRow.map {
+ case (fieldName, obj) => StructField(fieldName, typeOfObject(obj), true)
}.toSeq
- def needTransform(obj: Any): Boolean = obj match {
- case c: java.util.List[_] => true
- case c: java.util.Map[_, _] => true
- case c if c.getClass.isArray => true
- case c: java.util.Calendar => true
- case c => false
+ applySchemaToPythonRDD(rdd, StructType(fields))
+ }
+
+ /**
+ * Parses the data type in our internal string representation. The data type string should
+ * have the same format as the one generated by `toString` in scala.
+ * It is only used by PySpark.
+ */
+ private[sql] def parseDataType(dataTypeString: String): DataType = {
+ val parser = org.apache.spark.sql.catalyst.types.DataType
+ parser(dataTypeString)
+ }
+
+ /**
+ * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
+ */
+ private[sql] def applySchemaToPythonRDD(
+ rdd: RDD[Map[String, _]],
+ schemaString: String): SchemaRDD = {
+ val schema = parseDataType(schemaString).asInstanceOf[StructType]
+ applySchemaToPythonRDD(rdd, schema)
+ }
+
+ /**
+ * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
+ */
+ private[sql] def applySchemaToPythonRDD(
+ rdd: RDD[Map[String, _]],
+ schema: StructType): SchemaRDD = {
+ // TODO: We should have a better implementation once we do not turn a Python side record
+ // to a Map.
+ import scala.collection.JavaConversions._
+ import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
+
+ def needsConversion(dataType: DataType): Boolean = dataType match {
+ case ByteType => true
+ case ShortType => true
+ case FloatType => true
+ case TimestampType => true
+ case ArrayType(_, _) => true
+ case MapType(_, _, _) => true
+ case StructType(_) => true
+ case other => false
}
- // convert JList, JArray into Seq, convert JMap into Map
- // convert Calendar into Timestamp
- def transform(obj: Any): Any = obj match {
- case c: java.util.List[_] => c.map(transform).toSeq
- case c: java.util.Map[_, _] => c.map {
- case (key, value) => (key, transform(value))
- }.toMap
- case c if c.getClass.isArray =>
- c.asInstanceOf[Array[_]].map(transform).toSeq
- case c: java.util.Calendar =>
- new java.sql.Timestamp(c.getTime().getTime())
- case c => c
+ // Converts value to the type specified by the data type.
+ // Because Python does not have data types for TimestampType, FloatType, ShortType, and
+ // ByteType, we need to explicitly convert values in columns of these data types to the desired
+ // JVM data types.
+ def convert(obj: Any, dataType: DataType): Any = (obj, dataType) match {
+ // TODO: We should check nullable
+ case (null, _) => null
+
+ case (c: java.util.List[_], ArrayType(elementType, _)) =>
+ val converted = c.map { e => convert(e, elementType)}
+ JListWrapper(converted)
+
+ case (c: java.util.Map[_, _], struct: StructType) =>
+ val row = new GenericMutableRow(struct.fields.length)
+ struct.fields.zipWithIndex.foreach {
+ case (field, i) =>
+ val value = convert(c.get(field.name), field.dataType)
+ row.update(i, value)
+ }
+ row
+
+ case (c: java.util.Map[_, _], MapType(keyType, valueType, _)) =>
+ val converted = c.map {
+ case (key, value) =>
+ (convert(key, keyType), convert(value, valueType))
+ }
+ JMapWrapper(converted)
+
+ case (c, ArrayType(elementType, _)) if c.getClass.isArray =>
+ val converted = c.asInstanceOf[Array[_]].map(e => convert(e, elementType))
+ converted: Seq[Any]
+
+ case (c: java.util.Calendar, TimestampType) => new java.sql.Timestamp(c.getTime().getTime())
+ case (c: Int, ByteType) => c.toByte
+ case (c: Int, ShortType) => c.toShort
+ case (c: Double, FloatType) => c.toFloat
+
+ case (c, _) => c
+ }
+
+ val convertedRdd = if (schema.fields.exists(f => needsConversion(f.dataType))) {
+ rdd.map(m => m.map { case (key, value) => (key, convert(value, schema(key).dataType)) })
+ } else {
+ rdd
}
- val need = firstRow.exists {case (key, value) => needTransform(value)}
- val transformed = if (need) {
- rdd.mapPartitions { iter =>
- iter.map {
- m => m.map {case (key, value) => (key, transform(value))}
+ val rowRdd = convertedRdd.mapPartitions { iter =>
+ val row = new GenericMutableRow(schema.fields.length)
+ val fieldsWithIndex = schema.fields.zipWithIndex
+ iter.map { m =>
+ // We cannot use m.values because the order of values returned by m.values may not
+ // match fields order.
+ fieldsWithIndex.foreach {
+ case (field, i) =>
+ val value =
+ m.get(field.name).flatMap(v => Option(v)).map(v => convert(v, field.dataType)).orNull
+ row.update(i, value)
}
- }
- } else rdd
- val rowRdd = transformed.mapPartitions { iter =>
- iter.map { map =>
- new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
+ row: Row
}
}
- new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
- }
+ new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema.toAttributes, rowRdd))(self))
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 172b6e0..420f21f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import java.util.{Map => JMap, List => JList, Set => JSet}
+import java.util.{Map => JMap, List => JList}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.types.{DataType, ArrayType, BooleanType, StructType, MapType}
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
import org.apache.spark.api.java.JavaRDD
@@ -120,6 +119,11 @@ class SchemaRDD(
override protected def getDependencies: Seq[Dependency[_]] =
List(new OneToOneDependency(queryExecution.toRdd))
+ /** Returns the schema of this SchemaRDD (represented by a [[StructType]]).
+ *
+ * @group schema
+ */
+ def schema: StructType = queryExecution.analyzed.schema
// =======================================================================
// Query DSL
@@ -376,6 +380,8 @@ class SchemaRDD(
* Converts a JavaRDD to a PythonRDD. It is used by pyspark.
*/
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+ import scala.collection.Map
+
def toJava(obj: Any, dataType: DataType): Any = dataType match {
case struct: StructType => rowToMap(obj.asInstanceOf[Row], struct)
case array: ArrayType => obj match {
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index fd75103..6a20def 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -123,9 +123,15 @@ private[sql] trait SchemaRDDLike {
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
- /** Returns the output schema in the tree format. */
- def schemaString: String = queryExecution.analyzed.schemaString
+ /** Returns the schema as a string in the tree format.
+ *
+ * @group schema
+ */
+ def schemaString: String = baseSchemaRDD.schema.treeString
- /** Prints out the schema in the tree format. */
+ /** Prints out the schema.
+ *
+ * @group schema
+ */
def printSchema(): Unit = println(schemaString)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 85726ba..c1c18a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -21,14 +21,16 @@ import java.beans.Introspector
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.sql.api.java.types.{StructType => JStructType}
import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
-import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.types.util.DataTypeConversions
+import DataTypeConversions.asScalaDataType;
import org.apache.spark.util.Utils
/**
@@ -96,6 +98,21 @@ class JavaSQLContext(val sqlContext: SQLContext) {
}
/**
+ * :: DeveloperApi ::
+ * Creates a JavaSchemaRDD from an RDD containing Rows by applying a schema to this RDD.
+ * It is important to make sure that the structure of every Row of the provided RDD matches the
+ * provided schema. Otherwise, there will be runtime exception.
+ */
+ @DeveloperApi
+ def applySchema(rowRDD: JavaRDD[Row], schema: JStructType): JavaSchemaRDD = {
+ val scalaRowRDD = rowRDD.rdd.map(r => r.row)
+ val scalaSchema = asScalaDataType(schema).asInstanceOf[StructType]
+ val logicalPlan =
+ SparkLogicalPlan(ExistingRdd(scalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+ new JavaSchemaRDD(sqlContext, logicalPlan)
+ }
+
+ /**
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
@@ -104,23 +121,49 @@ class JavaSQLContext(val sqlContext: SQLContext) {
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext))
/**
- * Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
+ * Loads a JSON file (one object per line), returning the result as a JavaSchemaRDD.
* It goes through the entire dataset once to determine the schema.
- *
- * @group userf
*/
def jsonFile(path: String): JavaSchemaRDD =
jsonRDD(sqlContext.sparkContext.textFile(path))
/**
+ * :: Experimental ::
+ * Loads a JSON file (one object per line) and applies the given schema,
+ * returning the result as a JavaSchemaRDD.
+ */
+ @Experimental
+ def jsonFile(path: String, schema: JStructType): JavaSchemaRDD =
+ jsonRDD(sqlContext.sparkContext.textFile(path), schema)
+
+ /**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
- * [[JavaSchemaRDD]].
+ * JavaSchemaRDD.
* It goes through the entire dataset once to determine the schema.
- *
- * @group userf
*/
- def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
- new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))
+ def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
+ val appliedScalaSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))
+ val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+ val logicalPlan =
+ SparkLogicalPlan(ExistingRdd(appliedScalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+ new JavaSchemaRDD(sqlContext, logicalPlan)
+ }
+
+ /**
+ * :: Experimental ::
+ * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
+ * returning the result as a JavaSchemaRDD.
+ */
+ @Experimental
+ def jsonRDD(json: JavaRDD[String], schema: JStructType): JavaSchemaRDD = {
+ val appliedScalaSchema =
+ Option(asScalaDataType(schema)).getOrElse(
+ JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))).asInstanceOf[StructType]
+ val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+ val logicalPlan =
+ SparkLogicalPlan(ExistingRdd(appliedScalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+ new JavaSchemaRDD(sqlContext, logicalPlan)
+ }
/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
index 8fbf13b..8245741 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
@@ -22,8 +22,11 @@ import java.util.{List => JList}
import org.apache.spark.Partitioner
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.sql.api.java.types.StructType
+import org.apache.spark.sql.types.util.DataTypeConversions
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import DataTypeConversions._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -53,6 +56,10 @@ class JavaSchemaRDD(
override def toString: String = baseSchemaRDD.toString
+ /** Returns the schema of this JavaSchemaRDD (represented by a StructType). */
+ def schema: StructType =
+ asJavaDataType(baseSchemaRDD.schema).asInstanceOf[StructType]
+
// =======================================================================
// Base RDD functions that do NOT change schema
// =======================================================================
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
index 9b0dd21..6c67934 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
@@ -17,6 +17,11 @@
package org.apache.spark.sql.api.java
+import scala.annotation.varargs
+import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
+import scala.collection.JavaConversions
+import scala.math.BigDecimal
+
import org.apache.spark.sql.catalyst.expressions.{Row => ScalaRow}
/**
@@ -29,7 +34,7 @@ class Row(private[spark] val row: ScalaRow) extends Serializable {
/** Returns the value of column `i`. */
def get(i: Int): Any =
- row(i)
+ Row.toJavaValue(row(i))
/** Returns true if value at column `i` is NULL. */
def isNullAt(i: Int) = get(i) == null
@@ -89,5 +94,57 @@ class Row(private[spark] val row: ScalaRow) extends Serializable {
*/
def getString(i: Int): String =
row.getString(i)
+
+ def canEqual(other: Any): Boolean = other.isInstanceOf[Row]
+
+ override def equals(other: Any): Boolean = other match {
+ case that: Row =>
+ (that canEqual this) &&
+ row == that.row
+ case _ => false
+ }
+
+ override def hashCode(): Int = row.hashCode()
}
+object Row {
+
+ private def toJavaValue(value: Any): Any = value match {
+ // For values of this ScalaRow, we will do the conversion when
+ // they are actually accessed.
+ case row: ScalaRow => new Row(row)
+ case map: scala.collection.Map[_, _] =>
+ JavaConversions.mapAsJavaMap(
+ map.map {
+ case (key, value) => (toJavaValue(key), toJavaValue(value))
+ }
+ )
+ case seq: scala.collection.Seq[_] =>
+ JavaConversions.seqAsJavaList(seq.map(toJavaValue))
+ case decimal: BigDecimal => decimal.underlying()
+ case other => other
+ }
+
+ // TODO: Consolidate the toScalaValue at here with the scalafy in JsonRDD?
+ private def toScalaValue(value: Any): Any = value match {
+ // Values of this row have been converted to Scala values.
+ case row: Row => row.row
+ case map: java.util.Map[_, _] =>
+ JMapWrapper(map).map {
+ case (key, value) => (toScalaValue(key), toScalaValue(value))
+ }
+ case list: java.util.List[_] =>
+ JListWrapper(list).map(toScalaValue)
+ case decimal: java.math.BigDecimal => BigDecimal(decimal)
+ case other => other
+ }
+
+ /**
+ * Creates a Row with the given values.
+ */
+ @varargs def create(values: Any*): Row = {
+ // Right now, we cannot use @varargs to annotate the constructor of
+ // org.apache.spark.sql.api.java.Row. See https://issues.scala-lang.org/browse/SI-8383.
+ new Row(ScalaRow(values.map(toScalaValue):_*))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 6c2b553..bd29ee4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -25,33 +25,25 @@ import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
-import org.apache.spark.sql.{SQLContext, Logging}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.Logging
private[sql] object JsonRDD extends Logging {
+ private[sql] def jsonStringToRow(
+ json: RDD[String],
+ schema: StructType): RDD[Row] = {
+ parseJson(json).map(parsed => asRow(parsed, schema))
+ }
+
private[sql] def inferSchema(
- sqlContext: SQLContext,
json: RDD[String],
- samplingRatio: Double = 1.0): LogicalPlan = {
+ samplingRatio: Double = 1.0): StructType = {
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
- val baseSchema = createSchema(allKeys)
-
- createLogicalPlan(json, baseSchema, sqlContext)
- }
-
- private def createLogicalPlan(
- json: RDD[String],
- baseSchema: StructType,
- sqlContext: SQLContext): LogicalPlan = {
- val schema = nullTypeToStringType(baseSchema)
-
- SparkLogicalPlan(
- ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))(sqlContext)
+ createSchema(allKeys)
}
private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
@@ -75,8 +67,8 @@ private[sql] object JsonRDD extends Logging {
val (topLevel, structLike) = values.partition(_.size == 1)
val topLevelFields = topLevel.filter {
name => resolved.get(prefix ++ name).get match {
- case ArrayType(StructType(Nil)) => false
- case ArrayType(_) => true
+ case ArrayType(StructType(Nil), _) => false
+ case ArrayType(_, _) => true
case struct: StructType => false
case _ => true
}
@@ -90,7 +82,8 @@ private[sql] object JsonRDD extends Logging {
val structType = makeStruct(nestedFields, prefix :+ name)
val dataType = resolved.get(prefix :+ name).get
dataType match {
- case array: ArrayType => Some(StructField(name, ArrayType(structType), nullable = true))
+ case array: ArrayType =>
+ Some(StructField(name, ArrayType(structType, array.containsNull), nullable = true))
case struct: StructType => Some(StructField(name, structType, nullable = true))
// dataType is StringType means that we have resolved type conflicts involving
// primitive types and complex types. So, the type of name has been relaxed to
@@ -109,6 +102,22 @@ private[sql] object JsonRDD extends Logging {
makeStruct(resolved.keySet.toSeq, Nil)
}
+ private[sql] def nullTypeToStringType(struct: StructType): StructType = {
+ val fields = struct.fields.map {
+ case StructField(fieldName, dataType, nullable) => {
+ val newType = dataType match {
+ case NullType => StringType
+ case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
+ case struct: StructType => nullTypeToStringType(struct)
+ case other: DataType => other
+ }
+ StructField(fieldName, newType, nullable)
+ }
+ }
+
+ StructType(fields)
+ }
+
/**
* Returns the most general data type for two given data types.
*/
@@ -139,8 +148,8 @@ private[sql] object JsonRDD extends Logging {
case StructField(name, _, _) => name
})
}
- case (ArrayType(elementType1), ArrayType(elementType2)) =>
- ArrayType(compatibleType(elementType1, elementType2))
+ case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
+ ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
// TODO: We should use JsonObjectStringType to mark that values of field will be
// strings and every string is a Json object.
case (_, _) => StringType
@@ -148,18 +157,13 @@ private[sql] object JsonRDD extends Logging {
}
}
- private def typeOfPrimitiveValue(value: Any): DataType = {
- value match {
- case value: java.lang.String => StringType
- case value: java.lang.Integer => IntegerType
- case value: java.lang.Long => LongType
+ private def typeOfPrimitiveValue: PartialFunction[Any, DataType] = {
+ ScalaReflection.typeOfObject orElse {
// Since we do not have a data type backed by BigInteger,
// when we see a Java BigInteger, we use DecimalType.
case value: java.math.BigInteger => DecimalType
- case value: java.lang.Double => DoubleType
+ // DecimalType's JVMType is scala BigDecimal.
case value: java.math.BigDecimal => DecimalType
- case value: java.lang.Boolean => BooleanType
- case null => NullType
// Unexpected data type.
case _ => StringType
}
@@ -172,12 +176,13 @@ private[sql] object JsonRDD extends Logging {
* treat the element as String.
*/
private def typeOfArray(l: Seq[Any]): ArrayType = {
+ val containsNull = l.exists(v => v == null)
val elements = l.flatMap(v => Option(v))
if (elements.isEmpty) {
// If this JSON array is empty, we use NullType as a placeholder.
// If this array is not empty in other JSON objects, we can resolve
// the type after we have passed through all JSON objects.
- ArrayType(NullType)
+ ArrayType(NullType, containsNull)
} else {
val elementType = elements.map {
e => e match {
@@ -189,7 +194,7 @@ private[sql] object JsonRDD extends Logging {
}
}.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
- ArrayType(elementType)
+ ArrayType(elementType, containsNull)
}
}
@@ -216,15 +221,16 @@ private[sql] object JsonRDD extends Logging {
case (key: String, array: Seq[_]) => {
// The value associated with the key is an array.
typeOfArray(array) match {
- case ArrayType(StructType(Nil)) => {
+ case ArrayType(StructType(Nil), containsNull) => {
// The elements of this arrays are structs.
array.asInstanceOf[Seq[Map[String, Any]]].flatMap {
element => allKeysWithValueTypes(element)
}.map {
case (k, dataType) => (s"$key.$k", dataType)
- } :+ (key, ArrayType(StructType(Nil)))
+ } :+ (key, ArrayType(StructType(Nil), containsNull))
}
- case ArrayType(elementType) => (key, ArrayType(elementType)) :: Nil
+ case ArrayType(elementType, containsNull) =>
+ (key, ArrayType(elementType, containsNull)) :: Nil
}
}
case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
@@ -262,8 +268,11 @@ private[sql] object JsonRDD extends Logging {
// the ObjectMapper will take the last value associated with this duplicate key.
// For example: for {"key": 1, "key":2}, we will get "key"->2.
val mapper = new ObjectMapper()
- iter.map(record => mapper.readValue(record, classOf[java.util.Map[String, Any]]))
- }).map(scalafy).map(_.asInstanceOf[Map[String, Any]])
+ iter.map { record =>
+ val parsed = scalafy(mapper.readValue(record, classOf[java.util.Map[String, Any]]))
+ parsed.asInstanceOf[Map[String, Any]]
+ }
+ })
}
private def toLong(value: Any): Long = {
@@ -334,7 +343,7 @@ private[sql] object JsonRDD extends Logging {
null
} else {
desiredType match {
- case ArrayType(elementType) =>
+ case ArrayType(elementType, _) =>
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
case StringType => toString(value)
case IntegerType => value.asInstanceOf[IntegerType.JvmType]
@@ -348,6 +357,7 @@ private[sql] object JsonRDD extends Logging {
}
private def asRow(json: Map[String,Any], schema: StructType): Row = {
+ // TODO: Reuse the row instead of creating a new one for every record.
val row = new GenericMutableRow(schema.fields.length)
schema.fields.zipWithIndex.foreach {
// StructType
@@ -356,7 +366,7 @@ private[sql] object JsonRDD extends Logging {
v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
// ArrayType(StructType)
- case (StructField(name, ArrayType(structType: StructType), _), i) =>
+ case (StructField(name, ArrayType(structType: StructType, _), _), i) =>
row.update(i,
json.get(name).flatMap(v => Option(v)).map(
v => v.asInstanceOf[Seq[Any]].map(
@@ -370,32 +380,4 @@ private[sql] object JsonRDD extends Logging {
row
}
-
- private def nullTypeToStringType(struct: StructType): StructType = {
- val fields = struct.fields.map {
- case StructField(fieldName, dataType, nullable) => {
- val newType = dataType match {
- case NullType => StringType
- case ArrayType(NullType) => ArrayType(StringType)
- case struct: StructType => nullTypeToStringType(struct)
- case other: DataType => other
- }
- StructField(fieldName, newType, nullable)
- }
- }
-
- StructType(fields)
- }
-
- private def asAttributes(struct: StructType): Seq[AttributeReference] = {
- struct.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
- }
-
- private def asStruct(attributes: Seq[AttributeReference]): StructType = {
- val fields = attributes.map {
- case AttributeReference(name, dataType, nullable) => StructField(name, dataType, nullable)
- }
-
- StructType(fields)
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/package-info.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package-info.java b/sql/core/src/main/scala/org/apache/spark/sql/package-info.java
new file mode 100644
index 0000000..5360361
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Allows the execution of relational queries, including those expressed in SQL using Spark.
+ */
+package org.apache.spark.sql;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
new file mode 100644
index 0000000..0995a4e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * Allows the execution of relational queries, including those expressed in SQL using Spark.
+ *
+ * @groupname dataType Data types
+ * @groupdesc Spark SQL data types.
+ * @groupprio dataType -3
+ * @groupname field Field
+ * @groupprio field -2
+ * @groupname row Row
+ * @groupprio row -1
+ */
+package object sql {
+
+ protected[sql] type Logging = com.typesafe.scalalogging.slf4j.Logging
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * Represents one row of output from a relational operator.
+ * @group row
+ */
+ @DeveloperApi
+ type Row = catalyst.expressions.Row
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * A [[Row]] object can be constructed by providing field values. Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * // Create a Row from values.
+ * Row(value1, value2, value3, ...)
+ * // Create a Row from a Seq of values.
+ * Row.fromSeq(Seq(value1, value2, ...))
+ * }}}
+ *
+ * A value of a row can be accessed through both generic access by ordinal,
+ * which will incur boxing overhead for primitives, as well as native primitive access.
+ * An example of generic access by ordinal:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val row = Row(1, true, "a string", null)
+ * // row: Row = [1,true,a string,null]
+ * val firstValue = row(0)
+ * // firstValue: Any = 1
+ * val fourthValue = row(3)
+ * // fourthValue: Any = null
+ * }}}
+ *
+ * For native primitive access, it is invalid to use the native primitive interface to retrieve
+ * a value that is null, instead a user must check `isNullAt` before attempting to retrieve a
+ * value that might be null.
+ * An example of native primitive access:
+ * {{{
+ * // using the row from the previous example.
+ * val firstValue = row.getInt(0)
+ * // firstValue: Int = 1
+ * val isNull = row.isNullAt(3)
+ * // isNull: Boolean = true
+ * }}}
+ *
+ * Interfaces related to native primitive access are:
+ *
+ * `isNullAt(i: Int): Boolean`
+ *
+ * `getInt(i: Int): Int`
+ *
+ * `getLong(i: Int): Long`
+ *
+ * `getDouble(i: Int): Double`
+ *
+ * `getFloat(i: Int): Float`
+ *
+ * `getBoolean(i: Int): Boolean`
+ *
+ * `getShort(i: Int): Short`
+ *
+ * `getByte(i: Int): Byte`
+ *
+ * `getString(i: Int): String`
+ *
+ * Fields in a [[Row]] object can be extracted in a pattern match. Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val pairs = sql("SELECT key, value FROM src").rdd.map {
+ * case Row(key: Int, value: String) =>
+ * key -> value
+ * }
+ * }}}
+ *
+ * @group row
+ */
+ @DeveloperApi
+ val Row = catalyst.expressions.Row
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The base type of all Spark SQL data types.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ type DataType = catalyst.types.DataType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `String` values
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val StringType = catalyst.types.StringType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Array[Byte]` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val BinaryType = catalyst.types.BinaryType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Boolean` values.
+ *
+ *@group dataType
+ */
+ @DeveloperApi
+ val BooleanType = catalyst.types.BooleanType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `java.sql.Timestamp` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val TimestampType = catalyst.types.TimestampType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `scala.math.BigDecimal` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val DecimalType = catalyst.types.DecimalType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Double` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val DoubleType = catalyst.types.DoubleType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Float` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val FloatType = catalyst.types.FloatType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Byte` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val ByteType = catalyst.types.ByteType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Int` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val IntegerType = catalyst.types.IntegerType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Long` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val LongType = catalyst.types.LongType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Short` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val ShortType = catalyst.types.ShortType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type for collections of multiple values.
+ * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+ *
+ * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and
+ * `containsNull: Boolean`. The field of `elementType` is used to specify the type of
+ * array elements. The field of `containsNull` is used to specify if the array has `null` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ type ArrayType = catalyst.types.ArrayType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * An [[ArrayType]] object can be constructed with two ways,
+ * {{{
+ * ArrayType(elementType: DataType, containsNull: Boolean)
+ * }}} and
+ * {{{
+ * ArrayType(elementType: DataType)
+ * }}}
+ * For `ArrayType(elementType)`, the field of `containsNull` is set to `false`.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val ArrayType = catalyst.types.ArrayType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Map`s. A [[MapType]] object comprises three fields,
+ * `keyType: [[DataType]]`, `valueType: [[DataType]]` and `valueContainsNull: Boolean`.
+ * The field of `keyType` is used to specify the type of keys in the map.
+ * The field of `valueType` is used to specify the type of values in the map.
+ * The field of `valueContainsNull` is used to specify if values of this map has `null` values.
+ * For values of a MapType column, keys are not allowed to have `null` values.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ type MapType = catalyst.types.MapType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * A [[MapType]] object can be constructed with two ways,
+ * {{{
+ * MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)
+ * }}} and
+ * {{{
+ * MapType(keyType: DataType, valueType: DataType)
+ * }}}
+ * For `MapType(keyType: DataType, valueType: DataType)`,
+ * the field of `valueContainsNull` is set to `true`.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val MapType = catalyst.types.MapType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * The data type representing [[Row]]s.
+ * A [[StructType]] object comprises a [[Seq]] of [[StructField]]s.
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ type StructType = catalyst.types.StructType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * A [[StructType]] object can be constructed by
+ * {{{
+ * StructType(fields: Seq[StructField])
+ * }}}
+ * For a [[StructType]] object, one or multiple [[StructField]]s can be extracted by names.
+ * If multiple [[StructField]]s are extracted, a [[StructType]] object will be returned.
+ * If a provided name does not have a matching field, it will be ignored. For the case
+ * of extracting a single StructField, a `null` will be returned.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val struct =
+ * StructType(
+ * StructField("a", IntegerType, true) ::
+ * StructField("b", LongType, false) ::
+ * StructField("c", BooleanType, false) :: Nil)
+ *
+ * // Extract a single StructField.
+ * val singleField = struct("b")
+ * // singleField: StructField = StructField(b,LongType,false)
+ *
+ * // This struct does not have a field called "d". null will be returned.
+ * val nonExisting = struct("d")
+ * // nonExisting: StructField = null
+ *
+ * // Extract multiple StructFields. Field names are provided in a set.
+ * // A StructType object will be returned.
+ * val twoFields = struct(Set("b", "c"))
+ * // twoFields: StructType =
+ * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ *
+ * // Those names do not have matching fields will be ignored.
+ * // For the case shown below, "d" will be ignored and
+ * // it is treated as struct(Set("b", "c")).
+ * val ignoreNonExisting = struct(Set("b", "c", "d"))
+ * // ignoreNonExisting: StructType =
+ * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ * }}}
+ *
+ * A [[Row]] object is used as a value of the StructType.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val innerStruct =
+ * StructType(
+ * StructField("f1", IntegerType, true) ::
+ * StructField("f2", LongType, false) ::
+ * StructField("f3", BooleanType, false) :: Nil)
+ *
+ * val struct = StructType(
+ * StructField("a", innerStruct, true) :: Nil)
+ *
+ * // Create a Row with the schema defined by struct
+ * val row = Row(Row(1, 2, true))
+ * // row: Row = [[1,2,true]]
+ * }}}
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val StructType = catalyst.types.StructType
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * A [[StructField]] object represents a field in a [[StructType]] object.
+ * A [[StructField]] object comprises three fields, `name: [[String]]`, `dataType: [[DataType]]`,
+ * and `nullable: Boolean`. The field of `name` is the name of a `StructField`. The field of
+ * `dataType` specifies the data type of a `StructField`.
+ * The field of `nullable` specifies if values of a `StructField` can contain `null` values.
+ *
+ * @group field
+ */
+ @DeveloperApi
+ type StructField = catalyst.types.StructField
+
+ /**
+ * :: DeveloperApi ::
+ *
+ * A [[StructField]] object can be constructed by
+ * {{{
+ * StructField(name: String, dataType: DataType, nullable: Boolean)
+ * }}}
+ *
+ * @group dataType
+ */
+ @DeveloperApi
+ val StructField = catalyst.types.StructField
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index de8fe2d..0a3b59c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -75,21 +75,21 @@ private[sql] object CatalystConverter {
val fieldType: DataType = field.dataType
fieldType match {
// For native JVM types we use a converter with native arrays
- case ArrayType(elementType: NativeType) => {
+ case ArrayType(elementType: NativeType, false) => {
new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
}
// This is for other types of arrays, including those with nested fields
- case ArrayType(elementType: DataType) => {
+ case ArrayType(elementType: DataType, false) => {
new CatalystArrayConverter(elementType, fieldIndex, parent)
}
case StructType(fields: Seq[StructField]) => {
new CatalystStructConverter(fields.toArray, fieldIndex, parent)
}
- case MapType(keyType: DataType, valueType: DataType) => {
+ case MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) => {
new CatalystMapConverter(
Array(
new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
- new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
+ new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, valueContainsNull)),
fieldIndex,
parent)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 39294a3..6d4ce32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -172,10 +172,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
if (value != null) {
schema match {
- case t @ ArrayType(_) => writeArray(
+ case t @ ArrayType(_, false) => writeArray(
t,
value.asInstanceOf[CatalystConverter.ArrayScalaType[_]])
- case t @ MapType(_, _) => writeMap(
+ case t @ MapType(_, _, _) => writeMap(
t,
value.asInstanceOf[CatalystConverter.MapScalaType[_, _]])
case t @ StructType(_) => writeStruct(
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 58370b9..aaef1a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -116,7 +116,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetOriginalType.LIST => { // TODO: check enums!
assert(groupType.getFieldCount == 1)
val field = groupType.getFields.apply(0)
- new ArrayType(toDataType(field))
+ ArrayType(toDataType(field), containsNull = false)
}
case ParquetOriginalType.MAP => {
assert(
@@ -130,7 +130,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
val valueType = toDataType(keyValueGroup.getFields.apply(1))
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
- new MapType(keyType, valueType)
+ // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
+ // at here.
+ MapType(keyType, valueType)
}
case _ => {
// Note: the order of these checks is important!
@@ -140,10 +142,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
val valueType = toDataType(keyValueGroup.getFields.apply(1))
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
- new MapType(keyType, valueType)
+ // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
+ // at here.
+ MapType(keyType, valueType)
} else if (correspondsToArray(groupType)) { // ArrayType
val elementType = toDataType(groupType.getFields.apply(0))
- new ArrayType(elementType)
+ ArrayType(elementType, containsNull = false)
} else { // everything else: StructType
val fields = groupType
.getFields
@@ -151,7 +155,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
ptype.getName,
toDataType(ptype),
ptype.getRepetition != Repetition.REQUIRED))
- new StructType(fields)
+ StructType(fields)
}
}
}
@@ -234,7 +238,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
new ParquetPrimitiveType(repetition, primitiveType, name, originalType.orNull)
}.getOrElse {
ctype match {
- case ArrayType(elementType) => {
+ case ArrayType(elementType, false) => {
val parquetElementType = fromDataType(
elementType,
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
@@ -248,7 +252,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
new ParquetGroupType(repetition, name, fields)
}
- case MapType(keyType, valueType) => {
+ case MapType(keyType, valueType, _) => {
val parquetKeyType =
fromDataType(
keyType,
http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
new file mode 100644
index 0000000..d1aa3c8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types.util
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.api.java.types.{DataType => JDataType, StructField => JStructField}
+
+import scala.collection.JavaConverters._
+
+protected[sql] object DataTypeConversions {
+
+ /**
+ * Returns the equivalent StructField in Scala for the given StructField in Java.
+ */
+ def asJavaStructField(scalaStructField: StructField): JStructField = {
+ JDataType.createStructField(
+ scalaStructField.name,
+ asJavaDataType(scalaStructField.dataType),
+ scalaStructField.nullable)
+ }
+
+ /**
+ * Returns the equivalent DataType in Java for the given DataType in Scala.
+ */
+ def asJavaDataType(scalaDataType: DataType): JDataType = scalaDataType match {
+ case StringType => JDataType.StringType
+ case BinaryType => JDataType.BinaryType
+ case BooleanType => JDataType.BooleanType
+ case TimestampType => JDataType.TimestampType
+ case DecimalType => JDataType.DecimalType
+ case DoubleType => JDataType.DoubleType
+ case FloatType => JDataType.FloatType
+ case ByteType => JDataType.ByteType
+ case IntegerType => JDataType.IntegerType
+ case LongType => JDataType.LongType
+ case ShortType => JDataType.ShortType
+
+ case arrayType: ArrayType => JDataType.createArrayType(
+ asJavaDataType(arrayType.elementType), arrayType.containsNull)
+ case mapType: MapType => JDataType.createMapType(
+ asJavaDataType(mapType.keyType),
+ asJavaDataType(mapType.valueType),
+ mapType.valueContainsNull)
+ case structType: StructType => JDataType.createStructType(
+ structType.fields.map(asJavaStructField).asJava)
+ }
+
+ /**
+ * Returns the equivalent StructField in Scala for the given StructField in Java.
+ */
+ def asScalaStructField(javaStructField: JStructField): StructField = {
+ StructField(
+ javaStructField.getName,
+ asScalaDataType(javaStructField.getDataType),
+ javaStructField.isNullable)
+ }
+
+ /**
+ * Returns the equivalent DataType in Scala for the given DataType in Java.
+ */
+ def asScalaDataType(javaDataType: JDataType): DataType = javaDataType match {
+ case stringType: org.apache.spark.sql.api.java.types.StringType =>
+ StringType
+ case binaryType: org.apache.spark.sql.api.java.types.BinaryType =>
+ BinaryType
+ case booleanType: org.apache.spark.sql.api.java.types.BooleanType =>
+ BooleanType
+ case timestampType: org.apache.spark.sql.api.java.types.TimestampType =>
+ TimestampType
+ case decimalType: org.apache.spark.sql.api.java.types.DecimalType =>
+ DecimalType
+ case doubleType: org.apache.spark.sql.api.java.types.DoubleType =>
+ DoubleType
+ case floatType: org.apache.spark.sql.api.java.types.FloatType =>
+ FloatType
+ case byteType: org.apache.spark.sql.api.java.types.ByteType =>
+ ByteType
+ case integerType: org.apache.spark.sql.api.java.types.IntegerType =>
+ IntegerType
+ case longType: org.apache.spark.sql.api.java.types.LongType =>
+ LongType
+ case shortType: org.apache.spark.sql.api.java.types.ShortType =>
+ ShortType
+
+ case arrayType: org.apache.spark.sql.api.java.types.ArrayType =>
+ ArrayType(asScalaDataType(arrayType.getElementType), arrayType.isContainsNull)
+ case mapType: org.apache.spark.sql.api.java.types.MapType =>
+ MapType(
+ asScalaDataType(mapType.getKeyType),
+ asScalaDataType(mapType.getValueType),
+ mapType.isValueContainsNull)
+ case structType: org.apache.spark.sql.api.java.types.StructType =>
+ StructType(structType.getFields.map(asScalaStructField))
+ }
+}