You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/29 14:38:37 UTC

[spark] branch master updated: [MINOR] Trivial cleanups

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d98aa2a  [MINOR] Trivial cleanups
d98aa2a is described below

commit d98aa2a18437dcff4c0ffe3acc1fa58b27062bd0
Author: Lee Dongjin <do...@apache.org>
AuthorDate: Mon Jul 29 23:38:02 2019 +0900

    [MINOR] Trivial cleanups
    
    These are what I found during working on #22282.
    
    - Remove unused value: `UnsafeArraySuite#defaultTz`
    - Remove redundant new modifier to the case class, `KafkaSourceRDDPartition`
    - Remove unused variables from `RDD.scala`
    - Remove trailing space from `structured-streaming-kafka-integration.md`
    - Remove redundant parameter from `ArrowConvertersSuite`: `nullable` is `true` by default.
    - Remove leading empty line: `UnsafeRow`
    - Remove trailing empty line: `KafkaTestUtils`
    - Remove unthrown exception type: `UnsafeMapData`
    - Replace unused declarations: `expressions`
    - Remove duplicated default parameter: `AnalysisErrorSuite`
    - `ObjectExpressionsSuite`: remove duplicated parameters, conversions and unused variable
    
    Closes #25251 from dongjinleekr/cleanup/201907.
    
    Authored-by: Lee Dongjin <do...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 32 ++++++++---------
 .../apache/spark/InternalAccumulatorSuite.scala    |  2 +-
 docs/structured-streaming-kafka-integration.md     |  4 +--
 .../sql/catalyst/expressions/UnsafeMapData.java    |  2 +-
 .../spark/sql/catalyst/expressions/package.scala   |  4 +--
 .../expressions/ObjectExpressionsSuite.scala       |  6 ++--
 .../spark/sql/catalyst/util/UnsafeArraySuite.scala |  1 -
 .../spark/sql/sources/FilteredScanSuite.scala      | 40 +++++++++++-----------
 8 files changed, 44 insertions(+), 47 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 1b67e99..eafe3b1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -372,7 +372,7 @@ abstract class RDD[T: ClassTag](
    */
   def map[U: ClassTag](f: T => U): RDD[U] = withScope {
     val cleanF = sc.clean(f)
-    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
+    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
   }
 
   /**
@@ -381,7 +381,7 @@ abstract class RDD[T: ClassTag](
    */
   def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
     val cleanF = sc.clean(f)
-    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
+    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
   }
 
   /**
@@ -391,7 +391,7 @@ abstract class RDD[T: ClassTag](
     val cleanF = sc.clean(f)
     new MapPartitionsRDD[T, T](
       this,
-      (context, pid, iter) => iter.filter(cleanF),
+      (_, _, iter) => iter.filter(cleanF),
       preservesPartitioning = true)
   }
 
@@ -402,16 +402,16 @@ abstract class RDD[T: ClassTag](
     def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
       // Create an instance of external append only map which ignores values.
       val map = new ExternalAppendOnlyMap[T, Null, Null](
-        createCombiner = value => null,
+        createCombiner = _ => null,
         mergeValue = (a, b) => a,
         mergeCombiners = (a, b) => a)
       map.insertAll(partition.map(_ -> null))
       map.iterator.map(_._1)
     }
     partitioner match {
-      case Some(p) if numPartitions == partitions.length =>
+      case Some(_) if numPartitions == partitions.length =>
         mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
-      case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+      case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
     }
   }
 
@@ -684,7 +684,7 @@ abstract class RDD[T: ClassTag](
    * Return an RDD created by coalescing all elements within each partition into an array.
    */
   def glom(): RDD[Array[T]] = withScope {
-    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
+    new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray))
   }
 
   /**
@@ -814,7 +814,7 @@ abstract class RDD[T: ClassTag](
     val cleanedF = sc.clean(f)
     new MapPartitionsRDD(
       this,
-      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
+      (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
       preservesPartitioning)
   }
 
@@ -836,7 +836,7 @@ abstract class RDD[T: ClassTag](
       isOrderSensitive: Boolean = false): RDD[U] = withScope {
     new MapPartitionsRDD(
       this,
-      (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
+      (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
       preservesPartitioning = preservesPartitioning,
       isOrderSensitive = isOrderSensitive)
   }
@@ -849,7 +849,7 @@ abstract class RDD[T: ClassTag](
       preservesPartitioning: Boolean = false): RDD[U] = withScope {
     new MapPartitionsRDD(
       this,
-      (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter),
+      (_: TaskContext, _: Int, iter: Iterator[T]) => f(iter),
       preservesPartitioning)
   }
 
@@ -866,7 +866,7 @@ abstract class RDD[T: ClassTag](
     val cleanedF = sc.clean(f)
     new MapPartitionsRDD(
       this,
-      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
+      (_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
       preservesPartitioning)
   }
 
@@ -1040,7 +1040,7 @@ abstract class RDD[T: ClassTag](
       }
     }
     var jobResult: Option[T] = None
-    val mergeResult = (index: Int, taskResult: Option[T]) => {
+    val mergeResult = (_: Int, taskResult: Option[T]) => {
       if (taskResult.isDefined) {
         jobResult = jobResult match {
           case Some(value) => Some(f(value, taskResult.get))
@@ -1110,7 +1110,7 @@ abstract class RDD[T: ClassTag](
     var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
     val cleanOp = sc.clean(op)
     val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
-    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+    val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
     sc.runJob(this, foldPartition, mergeResult)
     jobResult
   }
@@ -1136,7 +1136,7 @@ abstract class RDD[T: ClassTag](
     val cleanSeqOp = sc.clean(seqOp)
     val cleanCombOp = sc.clean(combOp)
     val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
-    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
+    val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
     sc.runJob(this, aggregatePartition, mergeResult)
     jobResult
   }
@@ -1201,7 +1201,7 @@ abstract class RDD[T: ClassTag](
       timeout: Long,
       confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
     require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
-    val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
+    val countElements: (TaskContext, Iterator[T]) => Long = { (_, iter) =>
       var result = 0L
       while (iter.hasNext) {
         result += 1L
@@ -1244,7 +1244,7 @@ abstract class RDD[T: ClassTag](
     if (elementClassTag.runtimeClass.isArray) {
       throw new SparkException("countByValueApprox() does not support arrays")
     }
-    val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) =>
+    val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (_, iter) =>
       val map = new OpenHashMap[T, Long]
       iter.foreach {
         t => map.changeValue(t, 1L, _ + 1L)
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 62824a5..e7eef8e 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -71,7 +71,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
         taskAccum.value.get.asInstanceOf[Long]
       }
       // Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
-      assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
+      assert(taskAccumValues.sorted === (1L to numPartitions))
     }
     rdd.count()
     listener.awaitNextJobCompletion()
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index b0009e0..9a6e302 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -404,7 +404,7 @@ The following configurations are optional:
   <td>spark-kafka-source</td>
   <td>streaming and batch</td>
   <td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming
-  queries. If "kafka.group.id" is set, this option will be ignored. </td>
+  queries. If "kafka.group.id" is set, this option will be ignored.</td>
 </tr>
 <tr>
   <td>kafka.group.id</td>
@@ -421,7 +421,7 @@ The following configurations are optional:
   same group id are likely interfere with each other causing each query to read only part of the
   data. This may also occur when queries are started/restarted in quick succession. To minimize such
   issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to
-  be very small. When this is set, option "groupIdPrefix" will be ignored. </td>
+  be very small. When this is set, option "groupIdPrefix" will be ignored.</td>
 </tr>
 </table>
 
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java
index 60ac69b..58973e7 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeMapData.java
@@ -143,7 +143,7 @@ public final class UnsafeMapData extends MapData implements Externalizable, Kryo
   }
 
   @Override
-  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+  public void readExternal(ObjectInput in) throws IOException {
     this.baseOffset = BYTE_ARRAY_OFFSET;
     this.sizeInBytes = in.readInt();
     this.baseObject = new byte[sizeInBytes];
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 932c364..7164b6b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -177,7 +177,7 @@ package object expressions  {
             a => (resolver(dbPart, a.qualifier.head) && resolver(tblPart, a.qualifier.last))
           }
           (attributes, nestedFields)
-        case all =>
+        case _ =>
           (Seq.empty, Seq.empty)
       }
 
@@ -197,7 +197,7 @@ package object expressions  {
               resolver(qualifier, a.qualifier.last)
             }
             (attributes, nestedFields)
-          case all =>
+          case _ =>
             (Seq.empty[Attribute], Seq.empty[String])
         }
       }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index b6ca52f..a171885 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -330,9 +330,9 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
         case null =>
           assert(result.asInstanceOf[ArrayData].array.toSeq == expected)
         case l if classOf[java.util.List[_]].isAssignableFrom(l) =>
-          assert(result.asInstanceOf[java.util.List[_]].asScala.toSeq == expected)
+          assert(result.asInstanceOf[java.util.List[_]].asScala == expected)
         case s if classOf[Seq[_]].isAssignableFrom(s) =>
-          assert(result.asInstanceOf[Seq[_]].toSeq == expected)
+          assert(result.asInstanceOf[Seq[_]] == expected)
         case s if classOf[scala.collection.Set[_]].isAssignableFrom(s) =>
           assert(result.asInstanceOf[scala.collection.Set[_]] == expected.toSet)
       }
@@ -532,8 +532,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = {
     import org.apache.spark.sql.catalyst.ScalaReflection._
 
-    val curId = new java.util.concurrent.atomic.AtomicInteger()
-
     def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression =
          localTypeOf[V].dealias match {
        case t if t <:< localTypeOf[java.lang.Integer] =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
index 61ce63f..0b9e023 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/UnsafeArraySuite.scala
@@ -40,7 +40,6 @@ class UnsafeArraySuite extends SparkFunSuite {
   val dateArray = Array(
     DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1")).get,
     DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26")).get)
-  private def defaultTz = DateTimeUtils.defaultTimeZone()
   private def defaultZoneId = ZoneId.systemDefault()
   val timestampArray = Array(
     DateTimeUtils.stringToTimestamp(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index daac207..2b39bda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -51,13 +51,13 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
     def unhandled(filter: Filter): Boolean = {
       filter match {
-        case EqualTo(col, v) => col == "b"
-        case EqualNullSafe(col, v) => col == "b"
-        case LessThan(col, v: Int) => col == "b"
-        case LessThanOrEqual(col, v: Int) => col == "b"
-        case GreaterThan(col, v: Int) => col == "b"
-        case GreaterThanOrEqual(col, v: Int) => col == "b"
-        case In(col, values) => col == "b"
+        case EqualTo(col, _) => col == "b"
+        case EqualNullSafe(col, _) => col == "b"
+        case LessThan(col, _: Int) => col == "b"
+        case LessThanOrEqual(col, _: Int) => col == "b"
+        case GreaterThan(col, _: Int) => col == "b"
+        case GreaterThanOrEqual(col, _: Int) => col == "b"
+        case In(col, _) => col == "b"
         case IsNull(col) => col == "b"
         case IsNotNull(col) => col == "b"
         case Not(pred) => unhandled(pred)
@@ -107,7 +107,7 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S
       case StringEndsWith("c", v) => _.endsWith(v)
       case StringContains("c", v) => _.contains(v)
       case EqualTo("c", v: String) => _.equals(v)
-      case EqualTo("c", v: UTF8String) => sys.error("UTF8String should not appear in filters")
+      case EqualTo("c", _: UTF8String) => sys.error("UTF8String should not appear in filters")
       case In("c", values) => (s: String) => values.map(_.asInstanceOf[String]).toSet.contains(s)
       case _ => (c: String) => true
     }
@@ -152,39 +152,39 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
   sqlTest(
     "SELECT * FROM oneToTenFiltered",
     (1 to 10).map(i => Row(i, i * 2, (i - 1 + 'a').toChar.toString * 5
-      + (i - 1 + 'a').toChar.toString.toUpperCase(Locale.ROOT) * 5)).toSeq)
+      + (i - 1 + 'a').toChar.toString.toUpperCase(Locale.ROOT) * 5)))
 
   sqlTest(
     "SELECT a, b FROM oneToTenFiltered",
-    (1 to 10).map(i => Row(i, i * 2)).toSeq)
+    (1 to 10).map(i => Row(i, i * 2)))
 
   sqlTest(
     "SELECT b, a FROM oneToTenFiltered",
-    (1 to 10).map(i => Row(i * 2, i)).toSeq)
+    (1 to 10).map(i => Row(i * 2, i)))
 
   sqlTest(
     "SELECT a FROM oneToTenFiltered",
-    (1 to 10).map(i => Row(i)).toSeq)
+    (1 to 10).map(i => Row(i)))
 
   sqlTest(
     "SELECT b FROM oneToTenFiltered",
-    (1 to 10).map(i => Row(i * 2)).toSeq)
+    (1 to 10).map(i => Row(i * 2)))
 
   sqlTest(
     "SELECT a * 2 FROM oneToTenFiltered",
-    (1 to 10).map(i => Row(i * 2)).toSeq)
+    (1 to 10).map(i => Row(i * 2)))
 
   sqlTest(
     "SELECT A AS b FROM oneToTenFiltered",
-    (1 to 10).map(i => Row(i)).toSeq)
+    (1 to 10).map(i => Row(i)))
 
   sqlTest(
     "SELECT x.b, y.a FROM oneToTenFiltered x JOIN oneToTenFiltered y ON x.a = y.b",
-    (1 to 5).map(i => Row(i * 4, i)).toSeq)
+    (1 to 5).map(i => Row(i * 4, i)))
 
   sqlTest(
     "SELECT x.a, y.b FROM oneToTenFiltered x JOIN oneToTenFiltered y ON x.a = y.b",
-    (2 to 10 by 2).map(i => Row(i, i)).toSeq)
+    (2 to 10 by 2).map(i => Row(i, i)))
 
   sqlTest(
     "SELECT a, b FROM oneToTenFiltered WHERE a = 1",
@@ -208,11 +208,11 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
 
   sqlTest(
     "SELECT a, b FROM oneToTenFiltered WHERE a IS NOT NULL",
-    (1 to 10).map(i => Row(i, i * 2)).toSeq)
+    (1 to 10).map(i => Row(i, i * 2)))
 
   sqlTest(
     "SELECT a, b FROM oneToTenFiltered WHERE a < 5 AND a > 1",
-    (2 to 4).map(i => Row(i, i * 2)).toSeq)
+    (2 to 4).map(i => Row(i, i * 2)))
 
   sqlTest(
     "SELECT a, b FROM oneToTenFiltered WHERE a < 3 OR a > 8",
@@ -220,7 +220,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
 
   sqlTest(
     "SELECT a, b FROM oneToTenFiltered WHERE NOT (a < 6)",
-    (6 to 10).map(i => Row(i, i * 2)).toSeq)
+    (6 to 10).map(i => Row(i, i * 2)))
 
   sqlTest(
     "SELECT a, b, c FROM oneToTenFiltered WHERE c like 'c%'",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org