You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gatorsmile <gi...@git.apache.org> on 2015/12/16 21:01:48 UTC
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
GitHub user gatorsmile opened a pull request:
https://github.com/apache/spark/pull/10335
[Spark-12374][SPARK-12150][SQL] Adding logical/physical operators for Range
Based on the suggestions from @marmbrus , added logical/physical operators for Range for improving the performance.
Also added another API for resolving the JIRA Spark-12150.
Could you take a look at my implementation, @marmbrus ? If not, I can rework it. : )
Thank you very much!
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/gatorsmile/spark rangeOperators
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/10335.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #10335
----
commit 2aab4d648fa634ec427e37ae82b0328fad159720
Author: gatorsmile <ga...@gmail.com>
Date: 2015-12-16T19:56:07Z
adding logical/physical operators for Range
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165849874
@gatorsmile small follow-up on the benchmarking. If I execute the following code (note the ```.rdd.map(_.getLong(0))```):
val startTime = System.currentTimeMillis;
sqlContext.range(0, 10000000, 1, 15).rdd.map(_.getLong(0)).collect();
val endTime = System.currentTimeMillis;
val elapsed = (endTime - startTime)/ 1000.0
I get to an average of 845 ms per run (versus 477 for ```sc.range```).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47954304
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode {
+
+ override def outputsUnsafeRows: Boolean = true
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex((i, _) => {
+ val partitionStart = (i * numElements) / numSlices * step + start
+ val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
+ def getSafeMargin(bi: BigInt): Long =
+ if (bi.isValidLong) {
+ bi.toLong
+ } else if (bi > 0) {
+ Long.MaxValue
+ } else {
+ Long.MinValue
+ }
+ val safePartitionStart = getSafeMargin(partitionStart)
+ val safePartitionEnd = getSafeMargin(partitionEnd)
+ val bufferHolder = new BufferHolder(LongType.defaultSize)
--- End diff --
uh... True! Let me correct it. Thank you!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165298096
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47868/
Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165914289
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48030/
Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165238477
Sure, will do It! Thank you for your guidance!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165677516
New range API using logical/physical operators with `count` (workload **1,000,000,000** rows):
```
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416360107
endTime: Long = 1450416368590
start: java.sql.Timestamp = 2015-12-17 21:26:00.107
end: java.sql.Timestamp = 2015-12-17 21:26:08.59
elapsed: Double = 8.483
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416371664
endTime: Long = 1450416380654
start: java.sql.Timestamp = 2015-12-17 21:26:11.664
end: java.sql.Timestamp = 2015-12-17 21:26:20.654
elapsed: Double = 8.99
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416383434
endTime: Long = 1450416392369
start: java.sql.Timestamp = 2015-12-17 21:26:23.434
end: java.sql.Timestamp = 2015-12-17 21:26:32.369
elapsed: Double = 8.935
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47834560
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode
+{
+ override def outputsUnsafeRows: Boolean = true
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex((i, _) => {
+ val partitionStart = (i * numElements) / numSlices * step + start
+ val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
+ def getSafeMargin(bi: BigInt): Long =
+ if (bi.isValidLong) {
+ bi.toLong
+ } else if (bi > 0) {
+ Long.MaxValue
+ } else {
+ Long.MinValue
+ }
+ val safePartitionStart = getSafeMargin(partitionStart)
+ val safePartitionEnd = getSafeMargin(partitionEnd)
+ val bufferHolder = new BufferHolder(LongType.defaultSize)
+ val unsafeRow = new UnsafeRow
+
+ new Iterator[InternalRow] {
+ private[this] var number: Long = safePartitionStart
+ private[this] var overflow: Boolean = false
+
+ override def hasNext =
+ if (!overflow) {
+ if (step > 0) {
+ number < safePartitionEnd
+ } else {
+ number > safePartitionEnd
+ }
+ } else false
+
+ override def next() = {
+ val ret = number
+ number += step
+ if (number < ret ^ step < 0) {
+ // we have Long.MaxValue + Long.MaxValue < Long.MaxValue
+ // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
+ // back, we are pretty sure that we have an overflow.
+ overflow = true
+ }
+
+ bufferHolder.reset()
+ unsafeRow.pointTo(bufferHolder.buffer, 1, bufferHolder.totalSize())
--- End diff --
I am fine with both, as long as we move out of the ```next``` call.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47829760
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode
+{
--- End diff --
Will fix it. Thanks!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/10335
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-166433015
Thanks, merging to master.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165236260
Merged build finished. Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165329990
**[Test build #47889 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47889/consoleFull)** for PR 10335 at commit [`258b40a`](https://github.com/apache/spark/commit/258b40a0d39d6fa87b5db83e6f7fa3b4fe13b592).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:\n * `case class Range(`\n * `case class Range(`\n
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47853387
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
--- End diff --
I fully understood your concern. Readability and maintainability are always very critical. If the others have the same comments, maybe we should also make the same change in the RDD's Range API for ensuring them consistent?
Now, let me do the performance tests first. : ) Thank you!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165644454
**[Test build #47972 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47972/consoleFull)** for PR 10335 at commit [`576fea9`](https://github.com/apache/spark/commit/576fea91c18e91e3357ea2d2f44631033b66c71a).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47830240
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
+ (safeEnd - safeStart) / step
+ } else {
+ // the remainder has the same sign with range, could add 1 more
+ (safeEnd - safeStart) / step + 1
+ }
+ }
+
+ val output: Seq[Attribute] =
--- End diff --
I think it would be better to make this an argument to the constructor and have a factory in the companion object that creates one automatically.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165671763
New range API using logical/physical operators:
```
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450413004393
endTime: Long = 1450413022682
start: java.sql.Timestamp = 2015-12-17 20:30:04.393
end: java.sql.Timestamp = 2015-12-17 20:30:22.682
elapsed: Double = 18.289
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450413029542
endTime: Long = 1450413050828
start: java.sql.Timestamp = 2015-12-17 20:30:29.542
end: java.sql.Timestamp = 2015-12-17 20:30:50.828
elapsed: Double = 21.286
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450413052748
endTime: Long = 1450413072478
start: java.sql.Timestamp = 2015-12-17 20:30:52.748
end: java.sql.Timestamp = 2015-12-17 20:31:12.478
elapsed: Double = 19.73
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165298095
Merged build finished. Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165325669
retest this please
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165677421
RDD range API with `count` (workload **1,000,000,000** rows):
```
scala> val startTime = System.currentTimeMillis; sc.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416352767
endTime: Long = 1450416353302
start: java.sql.Timestamp = 2015-12-17 21:25:52.767
end: java.sql.Timestamp = 2015-12-17 21:25:53.302
elapsed: Double = 0.535
scala> val startTime = System.currentTimeMillis; sc.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416354143
endTime: Long = 1450416354673
start: java.sql.Timestamp = 2015-12-17 21:25:54.143
end: java.sql.Timestamp = 2015-12-17 21:25:54.673
elapsed: Double = 0.53
scala> val startTime = System.currentTimeMillis; sc.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416355390
endTime: Long = 1450416355936
start: java.sql.Timestamp = 2015-12-17 21:25:55.39
end: java.sql.Timestamp = 2015-12-17 21:25:55.936
elapsed: Double = 0.546
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165678995
When the workload is small, both APIs are less than *1* second. Thus, I increase the number by a factor of **100**. Compared with the old Range API, the new version is **3 times faster** than the old version.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165928400
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48032/
Test PASSed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165913189
Thank you very much! @marmbrus
I am trying to get an account. Hopefully, next time, I can directly use your performance benchmarking. Otherwise, I will try to mimic your benchmarking in my local laptop. : )
Also changed the code to use `createFromByteArray`. It looks much concise now.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47827840
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode
+{
+ override def outputsUnsafeRows: Boolean = true
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex((i, _) => {
+ val partitionStart = (i * numElements) / numSlices * step + start
+ val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
+ def getSafeMargin(bi: BigInt): Long =
+ if (bi.isValidLong) {
+ bi.toLong
+ } else if (bi > 0) {
+ Long.MaxValue
+ } else {
+ Long.MinValue
+ }
+ val safePartitionStart = getSafeMargin(partitionStart)
+ val safePartitionEnd = getSafeMargin(partitionEnd)
+ val bufferHolder = new BufferHolder(LongType.defaultSize)
+ val unsafeRow = new UnsafeRow
+
+ new Iterator[InternalRow] {
+ private[this] var number: Long = safePartitionStart
+ private[this] var overflow: Boolean = false
+
+ override def hasNext =
+ if (!overflow) {
+ if (step > 0) {
+ number < safePartitionEnd
+ } else {
+ number > safePartitionEnd
+ }
+ } else false
+
+ override def next() = {
+ val ret = number
+ number += step
+ if (number < ret ^ step < 0) {
+ // we have Long.MaxValue + Long.MaxValue < Long.MaxValue
+ // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
+ // back, we are pretty sure that we have an overflow.
+ overflow = true
+ }
+
+ bufferHolder.reset()
+ unsafeRow.pointTo(bufferHolder.buffer, 1, bufferHolder.totalSize())
--- End diff --
Why point to the same buffer after every iteration? We could do this during the construction of the iterator. ```BufferHolder``` might be overkill here, pointing to an array of 16 bytes should also do the trick.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165914287
Merged build finished. Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47952489
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode {
+
+ override def outputsUnsafeRows: Boolean = true
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex((i, _) => {
+ val partitionStart = (i * numElements) / numSlices * step + start
+ val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
+ def getSafeMargin(bi: BigInt): Long =
+ if (bi.isValidLong) {
+ bi.toLong
+ } else if (bi > 0) {
+ Long.MaxValue
+ } else {
+ Long.MinValue
+ }
+ val safePartitionStart = getSafeMargin(partitionStart)
+ val safePartitionEnd = getSafeMargin(partitionEnd)
+ val bufferHolder = new BufferHolder(LongType.defaultSize)
--- End diff --
So I am not confident that this works. You are allocating 8-bytes by calling this constructor. Without a growing the buffer, you allocate a byte array of 8 bytes to the ```UnsafeRow```, which is happening here. You would need at least 16 bytes for ```UnsafeRow``` to work (8 for the bitset and 8 for the long).
```BufferHolder``` is meant to be used with ```Unsafe*Writer``` classes. I don't think it adds much value here. I think we should just use a 16 byte array instead.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165671653
RDD range API:
```
scala> val startTime = System.currentTimeMillis; sc.range(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450412989642
endTime: Long = 1450412990221
start: java.sql.Timestamp = 2015-12-17 20:29:49.642
end: java.sql.Timestamp = 2015-12-17 20:29:50.221
elapsed: Double = 0.579
scala> val startTime = System.currentTimeMillis; sc.range(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450412991330
endTime: Long = 1450412991892
start: java.sql.Timestamp = 2015-12-17 20:29:51.33
end: java.sql.Timestamp = 2015-12-17 20:29:51.892
elapsed: Double = 0.562
scala> val startTime = System.currentTimeMillis; sc.range(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450412995824
endTime: Long = 1450412996666
start: java.sql.Timestamp = 2015-12-17 20:29:55.824
end: java.sql.Timestamp = 2015-12-17 20:29:56.666
elapsed: Double = 0.842
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165324725
Merged build finished. Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165324728
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47886/
Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47944123
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
--- End diff --
Sure, will do the change. Thanks!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-166153162
Merged build finished. Test PASSed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165238312
The high level structure of this look pretty good to me. Could you also post some numbers from a micro benchmark? It would be good to make sure we're actually speeding things up.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165671255
Merged build finished. Test PASSed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165236228
**[Test build #47852 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47852/consoleFull)** for PR 10335 at commit [`2aab4d6`](https://github.com/apache/spark/commit/2aab4d648fa634ec427e37ae82b0328fad159720).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:\n * `case class Range(`\n * `case class Range(`\n
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165928399
Merged build finished. Test PASSed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-166153163
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/48085/
Test PASSed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-166146505
**[Test build #48085 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48085/consoleFull)** for PR 10335 at commit [`36c862b`](https://github.com/apache/spark/commit/36c862b667ab811156295865883c0ce94d9c084d).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165671192
**[Test build #47972 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47972/consoleFull)** for PR 10335 at commit [`576fea9`](https://github.com/apache/spark/commit/576fea91c18e91e3357ea2d2f44631033b66c71a).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:\n * `case class Range(`\n * `case class Range(`\n
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165674792
Let me add the fix of https://github.com/apache/spark/pull/10337, try the function `count`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47834244
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
--- End diff --
I don't think this is the place where we should start speeding thing up :)... I'd rather have something which is easier to read.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47828506
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
--- End diff --
```xor```s are typically really hard to follow. I think this might be easier: ```safeEnd > safeStart == step > 0```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165826708
@gatorsmile I have been playing arround with this for a bit. Overall I think we should do this. I do have two things for you to consider.
The current approach follows the formal route. It implements a LogicalOperator/PhysicalOperator and changes the planner. It also - as you stated - reuses quite a bit of the code in ```SparkContext```. The current ```range``` operator slowness comes from the fact that we create a normal ```Row``` for each element; this is expensive because it creates 1E9 objects and will try to convert each Row to an internal one. We could also just address these two issues directly, by being wrapping the iterator provided by ```sc.range``` differently:
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
val logicalPlan = LogicalRDD(
AttributeReference("id", LongType, nullable = false)() :: Nil,
sparkContext.range(start, end, step, numPartitions).mapPartitions ({ i =>
val unsafeRow = new UnsafeRow
unsafeRow.pointTo(new Array[Byte](16), 1, 16)
i.map { id =>
unsafeRow.setLong(0, id)
unsafeRow
}
}, true))(self)
DataFrame(this, logicalPlan)
}
What do you think?
My second point is about the benchmarking. I have been toying with this PR and the benchmarking code, and I am not sure that the current results are as revealing as they should be. I think the ```sqlContext.range``` code in this PR is nearly as fast as the ```sc.range``` code. The big difference is caused by the fact that the ```collect()``` involves serialization. Serializing a ```Long``` is nowhere near as expensive as serializing an ```UnsafeRow```, in my benchmark of ```sqlContext.range``` serialization accounts for about 80-90% of the execution time (use the Spark Stage Timeline for this).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165671257
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47972/
Test PASSed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165232112
**[Test build #47852 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47852/consoleFull)** for PR 10335 at commit [`2aab4d6`](https://github.com/apache/spark/commit/2aab4d648fa634ec427e37ae82b0328fad159720).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165672087
@marmbrus Compared with the original range API (around 30 seconds), the new version is around 33% faster. Of course, the RDD range API is still much faster.
Do you think the performance improvement is good enough? Or any better idea we can try?
Thanks!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165879022
Hi, @hvanhovell ,
Thank you for your comments! Regarding the benchmarking, I do not have a good way to measure them. So far, `collect()` is not a good way when the workload scale is huge. It will cause a large scale of data movement, I guess. The performance number is misleading when you compare RDD Range API and Dataframe Range API.
When we using LogicalRDD-based solution, our default join type will not choose broadcast will use has a potential issue
I just tried your suggested method. It is 2 times slower when we using `count()`. I have to increase the workload scale to `1000000000`. When the scale is small, it is hard to do the performance compare since the result could be affected by many factors.
```
scala> val startTime = System.currentTimeMillis; sqlContext.logicalRDD_Range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450466566418
endTime: Long = 1450466581232
elapsed: Double = 14.814
scala> val startTime = System.currentTimeMillis; sqlContext.logicalRDD_Range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450466583781
endTime: Long = 1450466597751
elapsed: Double = 13.97
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450466600825
endTime: Long = 1450466608397
elapsed: Double = 7.572
scala> val startTime = System.currentTimeMillis; sqlContext.range(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450466611679
endTime: Long = 1450466619421
elapsed: Double = 7.742
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47828311
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode
+{
+ override def outputsUnsafeRows: Boolean = true
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex((i, _) => {
+ val partitionStart = (i * numElements) / numSlices * step + start
+ val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
+ def getSafeMargin(bi: BigInt): Long =
+ if (bi.isValidLong) {
+ bi.toLong
+ } else if (bi > 0) {
+ Long.MaxValue
+ } else {
+ Long.MinValue
+ }
+ val safePartitionStart = getSafeMargin(partitionStart)
+ val safePartitionEnd = getSafeMargin(partitionEnd)
+ val bufferHolder = new BufferHolder(LongType.defaultSize)
+ val unsafeRow = new UnsafeRow
+
+ new Iterator[InternalRow] {
+ private[this] var number: Long = safePartitionStart
+ private[this] var overflow: Boolean = false
+
+ override def hasNext =
+ if (!overflow) {
+ if (step > 0) {
+ number < safePartitionEnd
+ } else {
+ number > safePartitionEnd
+ }
+ } else false
+
+ override def next() = {
+ val ret = number
+ number += step
+ if (number < ret ^ step < 0) {
+ // we have Long.MaxValue + Long.MaxValue < Long.MaxValue
+ // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step
+ // back, we are pretty sure that we have an overflow.
+ overflow = true
+ }
+
+ bufferHolder.reset()
+ unsafeRow.pointTo(bufferHolder.buffer, 1, bufferHolder.totalSize())
--- End diff --
@hvanhovell Thank you! You are right, let me move it to the construction of the Iterator.
When writing a prototype, I used a 16 bytes array. I am just not sure if the code should just use the existing library here. Thus, I changed it to bufferHolder.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47877642
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
--- End diff --
We should simplify it. This is an one-time only thing that we shouldn't care about performance.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47830128
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
+ (safeEnd - safeStart) / step
+ } else {
+ // the remainder has the same sign with range, could add 1 more
+ (safeEnd - safeStart) / step + 1
+ }
+ }
+
+ val output: Seq[Attribute] =
+ StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
+ /**
+ * Computes [[Statistics]] for this plan. The default implementation assumes the output
+ * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+ * of cartesian joins.
+ *
+ * [[LeafNode]]s must override this.
+ */
--- End diff --
You can omit scala doc that would be inherited.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-166153111
**[Test build #48085 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48085/consoleFull)** for PR 10335 at commit [`36c862b`](https://github.com/apache/spark/commit/36c862b667ab811156295865883c0ce94d9c084d).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:\n * `case class Range(`\n * `case class Range(`\n
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165320378
retest this please
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47829468
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
--- End diff --
I guess the reason is that `xor` is faster.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165913724
**[Test build #48032 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48032/consoleFull)** for PR 10335 at commit [`a1abc2f`](https://github.com/apache/spark/commit/a1abc2f4f8b6bb043920ff1400c9e4a06e72dccf).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47829608
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode
+{
--- End diff --
Nit: this should be on the previous line.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165671862
Original range API:
```
scala> val startTime = System.currentTimeMillis; sqlContext.oldRange(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450413096482
endTime: Long = 1450413125867
start: java.sql.Timestamp = 2015-12-17 20:31:36.482
end: java.sql.Timestamp = 2015-12-17 20:32:05.867
elapsed: Double = 29.385
scala> val startTime = System.currentTimeMillis; sqlContext.oldRange(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450413127738
endTime: Long = 1450413157937
start: java.sql.Timestamp = 2015-12-17 20:32:07.738
end: java.sql.Timestamp = 2015-12-17 20:32:37.937
elapsed: Double = 30.199
scala> val startTime = System.currentTimeMillis; sqlContext.oldRange(0, 10000000, 1, 15).collect(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450413159888
endTime: Long = 1450413188929
start: java.sql.Timestamp = 2015-12-17 20:32:39.888
end: java.sql.Timestamp = 2015-12-17 20:33:08.929
elapsed: Double = 29.041
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165888931
BTW, the benchmarks look reasonable to me, I'm okay with merging this as soon as we are happy with the implementation.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165330013
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47889/
Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165928355
**[Test build #48032 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/48032/consoleFull)** for PR 10335 at commit [`a1abc2f`](https://github.com/apache/spark/commit/a1abc2f4f8b6bb043920ff1400c9e4a06e72dccf).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:\n * `case class Range(`\n * `case class Range(`\n
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165888818
Regarding benchmarking, we typically measure this kind of stuff using [ForeachResults](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala#L27) in spark-sql-perf (which measures just the [time to pull the rows out of the iterator](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala#L616) + conversion to the external format.) We should probably add an internal version as well that avoids the conversion cost.
Regarding @hvanhovell simplified implementation, I thought about proposing it like this. The only question is if we will ever want to add optimizations on top of this (i.e. we could do a count(*) on this kind of plan really quickly). Since its already implemented I skew towards the more logically transparent implementation. However, it might be nice to reuse the code in RDD as he proposes in the physical operator.
Super minor point: We should probably use [UnsafeRow.createFromByteArray](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L549) instead of `pointTo`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165327225
**[Test build #47889 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47889/consoleFull)** for PR 10335 at commit [`258b40a`](https://github.com/apache/spark/commit/258b40a0d39d6fa87b5db83e6f7fa3b4fe13b592).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165330012
Merged build finished. Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47830108
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
+ (safeEnd - safeStart) / step
+ } else {
+ // the remainder has the same sign with range, could add 1 more
+ (safeEnd - safeStart) / step + 1
+ }
+ }
+
+ val output: Seq[Attribute] =
+ StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes
+ /**
+ * Computes [[Statistics]] for this plan. The default implementation assumes the output
+ * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+ * of cartesian joins.
+ *
+ * [[LeafNode]]s must override this.
+ */
+ val sizeInBytes = LongType.defaultSize * numElements
--- End diff --
`protected`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165236262
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47852/
Test FAILed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47828090
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode
+{
+ override def outputsUnsafeRows: Boolean = true
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex((i, _) => {
+ val partitionStart = (i * numElements) / numSlices * step + start
+ val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
+ def getSafeMargin(bi: BigInt): Long =
+ if (bi.isValidLong) {
+ bi.toLong
+ } else if (bi > 0) {
+ Long.MaxValue
+ } else {
+ Long.MinValue
+ }
+ val safePartitionStart = getSafeMargin(partitionStart)
+ val safePartitionEnd = getSafeMargin(partitionEnd)
+ val bufferHolder = new BufferHolder(LongType.defaultSize)
+ val unsafeRow = new UnsafeRow
+
+ new Iterator[InternalRow] {
+ private[this] var number: Long = safePartitionStart
+ private[this] var overflow: Boolean = false
+
+ override def hasNext =
+ if (!overflow) {
--- End diff --
Is it even possible to overflow? We know the ```size```, ```begin``` and the ```end``` values when we create the object; an overflow should become there.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the pull request:
https://github.com/apache/spark/pull/10335#issuecomment-165677742
Original range API with `count` (workload **1,000,000,000** rows):
```
scala> val startTime = System.currentTimeMillis; sqlContext.oldRange(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416394240
endTime: Long = 1450416421199
start: java.sql.Timestamp = 2015-12-17 21:26:34.24
end: java.sql.Timestamp = 2015-12-17 21:27:01.199
elapsed: Double = 26.959
scala> val startTime = System.currentTimeMillis; sqlContext.oldRange(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416424035
endTime: Long = 1450416450869
start: java.sql.Timestamp = 2015-12-17 21:27:04.035
end: java.sql.Timestamp = 2015-12-17 21:27:30.869
elapsed: Double = 26.834
scala> val startTime = System.currentTimeMillis; sqlContext.oldRange(0, 1000000000, 1, 15).count(); val endTime = System.currentTimeMillis; val start = new Timestamp(startTime); val end = new Timestamp(endTime); val elapsed = (endTime - startTime)/ 1000.0
startTime: Long = 1450416452321
endTime: Long = 1450416480277
start: java.sql.Timestamp = 2015-12-17 21:27:32.321
end: java.sql.Timestamp = 2015-12-17 21:28:00.277
elapsed: Double = 27.956
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47829265
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---
@@ -126,6 +127,69 @@ case class Sample(
}
}
+case class Range(
+ start: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ output: Seq[Attribute])
+ extends LeafNode
+{
+ override def outputsUnsafeRows: Boolean = true
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex((i, _) => {
+ val partitionStart = (i * numElements) / numSlices * step + start
+ val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start
+ def getSafeMargin(bi: BigInt): Long =
+ if (bi.isValidLong) {
+ bi.toLong
+ } else if (bi > 0) {
+ Long.MaxValue
+ } else {
+ Long.MinValue
+ }
+ val safePartitionStart = getSafeMargin(partitionStart)
+ val safePartitionEnd = getSafeMargin(partitionEnd)
+ val bufferHolder = new BufferHolder(LongType.defaultSize)
+ val unsafeRow = new UnsafeRow
+
+ new Iterator[InternalRow] {
+ private[this] var number: Long = safePartitionStart
+ private[this] var overflow: Boolean = false
+
+ override def hasNext =
+ if (!overflow) {
--- End diff --
To be honest, this is copied from RDD.range APIs. I have the same question like you. For safety, I keep it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [Spark-12374][SPARK-12150][SQL] Adding logical...
Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/10335#discussion_r47986886
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---
@@ -210,6 +210,37 @@ case class Sort(
override def output: Seq[Attribute] = child.output
}
+case class Range(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int) extends LeafNode {
+ require(step != 0, "step cannot be 0")
+ val numElements: BigInt = {
+ val safeStart = BigInt(start)
+ val safeEnd = BigInt(end)
+ if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) {
--- End diff --
I did the change. : ) I use `!=` to replace `^`. Please check if this looks ok to you. @hvanhovell . Thank you!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org