You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/03 10:49:01 UTC

spark git commit: [SPARK-6560][CORE] Do not suppress exceptions from writer.write.

Repository: spark
Updated Branches:
  refs/heads/master 82701ee25 -> b0d884f04


[SPARK-6560][CORE] Do not suppress exceptions from writer.write.

If there is a failure in the Hadoop backend while calling
writer.write, we should remember this original exception,
and try to call writer.close(), but if that fails as well,
still report the original exception.

Note that, if writer.write fails, it is likely that writer
was left in an invalid state, and so actually makes it more
likely that writer.close will also fail. Which just increases
the chances for writer.write's exception to be suppressed.

This patch introduces an admittedly potentially too cute
Utils.tryWithSafeFinally method to handle the try/finally
gyrations.

Author: Stephen Haberman <st...@exigencecorp.com>

Closes #5223 from stephenh/do_not_suppress_writer_exception and squashes the following commits:

c7ad53f [Stephen Haberman] [SPARK-6560][CORE] Do not suppress exceptions from writer.write.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0d884f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0d884f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0d884f0

Branch: refs/heads/master
Commit: b0d884f044fea1c954da77073f3556cd9ab1e922
Parents: 82701ee
Author: Stephen Haberman <st...@exigencecorp.com>
Authored: Fri Apr 3 09:48:37 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Apr 3 09:48:37 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     | 11 +++--
 .../org/apache/spark/api/python/PythonRDD.scala |  8 ++--
 .../apache/spark/broadcast/HttpBroadcast.scala  | 19 +++++---
 .../master/FileSystemPersistenceEngine.scala    |  5 ++-
 .../deploy/rest/StandaloneRestClient.scala      |  8 +++-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  8 +++-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  9 ++--
 .../shuffle/IndexShuffleBlockManager.scala      |  6 +--
 .../spark/storage/BlockObjectWriter.scala       | 16 ++++---
 .../org/apache/spark/storage/DiskStore.scala    | 18 ++++----
 .../scala/org/apache/spark/util/Utils.scala     | 46 ++++++++++++++++++--
 .../spark/util/collection/ExternalSorter.scala  | 26 +++++------
 12 files changed, 118 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index c9426c5..5718951 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
   def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
     val out = new ByteArrayOutputStream
     val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
-    // Since statuses can be modified in parallel, sync on it
-    statuses.synchronized {
-      objOut.writeObject(statuses)
+    Utils.tryWithSafeFinally {
+      // Since statuses can be modified in parallel, sync on it
+      statuses.synchronized {
+        objOut.writeObject(statuses)
+      }
+    } {
+      objOut.close()
     }
-    objOut.close()
     out.toByteArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 36cf2af..b1ffba4 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
         try {
           val sock = serverSocket.accept()
           val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
-          try {
+          Utils.tryWithSafeFinally {
             writeIteratorToStream(items, out)
-          } finally {
+          } {
             out.close()
           }
         } catch {
@@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
     val file = File.createTempFile("broadcast", "", dir)
     path = file.getAbsolutePath
     val out = new FileOutputStream(file)
-    try {
+    Utils.tryWithSafeFinally {
       Utils.copyStream(in, out)
-    } finally {
+    } {
       out.close()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 74ccfa6..4457c75 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
   private def write(id: Long, value: Any) {
     val file = getFile(id)
     val fileOutputStream = new FileOutputStream(file)
-    try {
+    Utils.tryWithSafeFinally {
       val out: OutputStream = {
         if (compress) {
           compressionCodec.compressedOutputStream(fileOutputStream)
@@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
       }
       val ser = SparkEnv.get.serializer.newInstance()
       val serOut = ser.serializeStream(out)
-      serOut.writeObject(value)
-      serOut.close()
+      Utils.tryWithSafeFinally {
+        serOut.writeObject(value)
+      } {
+        serOut.close()
+      }
       files += file
-    } finally {
+    } {
       fileOutputStream.close()
     }
   }
@@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
     }
     val ser = SparkEnv.get.serializer.newInstance()
     val serIn = ser.deserializeStream(in)
-    val obj = serIn.readObject[T]()
-    serIn.close()
-    obj
+    Utils.tryWithSafeFinally {
+      serIn.readObject[T]()
+    } {
+      serIn.close()
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 32499b3..f459ed5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
 import akka.serialization.Serialization
 
 import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 
 
 /**
@@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
     val serializer = serialization.findSerializerFor(value)
     val serialized = serializer.toBinary(value)
     val out = new FileOutputStream(file)
-    try {
+    Utils.tryWithSafeFinally {
       out.write(serialized)
-    } finally {
+    } {
       out.close()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
index 420442f..a3539e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
 import com.google.common.base.Charsets
 
 import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+import org.apache.spark.util.Utils
 
 /**
  * A client that submits applications to the standalone Master using a REST protocol.
@@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
     conn.setRequestProperty("charset", "utf-8")
     conn.setDoOutput(true)
     val out = new DataOutputStream(conn.getOutputStream)
-    out.write(json.getBytes(Charsets.UTF_8))
-    out.close()
+    Utils.tryWithSafeFinally {
+      out.write(json.getBytes(Charsets.UTF_8))
+    } {
+      out.close()
+    }
     readResponse(conn)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 1c13e2c..760c0fa 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
 
 private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
 
@@ -112,8 +113,11 @@ private[spark] object CheckpointRDD extends Logging {
     }
     val serializer = env.serializer.newInstance()
     val serializeStream = serializer.serializeStream(fileOutputStream)
-    serializeStream.writeAll(iterator)
-    serializeStream.close()
+    Utils.tryWithSafeFinally {
+      serializeStream.writeAll(iterator)
+    } {
+      serializeStream.close()
+    }
 
     if (!fs.rename(tempOutputPath, finalOutputPath)) {
       if (!fs.exists(finalOutputPath)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 6b4f097..bf1303d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -995,7 +995,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
       require(writer != null, "Unable to obtain RecordWriter")
       var recordsWritten = 0L
-      try {
+      Utils.tryWithSafeFinally {
         while (iter.hasNext) {
           val pair = iter.next()
           writer.write(pair._1, pair._2)
@@ -1004,7 +1004,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
           recordsWritten += 1
         }
-      } finally {
+      } {
         writer.close(hadoopContext)
       }
       committer.commitTask(hadoopContext)
@@ -1068,7 +1068,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       writer.setup(context.stageId, context.partitionId, taskAttemptId)
       writer.open()
       var recordsWritten = 0L
-      try {
+
+      Utils.tryWithSafeFinally {
         while (iter.hasNext) {
           val record = iter.next()
           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
@@ -1077,7 +1078,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
           recordsWritten += 1
         }
-      } finally {
+      } {
         writer.close()
       }
       writer.commit()

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 50edb5a..a1741e2 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.storage._
+import org.apache.spark.util.Utils
 
 import IndexShuffleBlockManager.NOOP_REDUCE_ID
 
@@ -78,16 +79,15 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
   def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
     val indexFile = getIndexFile(shuffleId, mapId)
     val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
-    try {
+    Utils.tryWithSafeFinally {
       // We take in lengths of each block, need to convert it to offsets.
       var offset = 0L
       out.writeLong(offset)
-
       for (length <- lengths) {
         offset += length
         out.writeLong(offset)
       }
-    } finally {
+    } {
       out.close()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index f703e50..0dfc91d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -23,6 +23,7 @@ import java.nio.channels.FileChannel
 import org.apache.spark.Logging
 import org.apache.spark.serializer.{SerializationStream, Serializer}
 import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.util.Utils
 
 /**
  * An interface for writing JVM objects to some underlying storage. This interface allows
@@ -140,14 +141,17 @@ private[spark] class DiskBlockObjectWriter(
 
   override def close() {
     if (initialized) {
-      if (syncWrites) {
-        // Force outstanding writes to disk and track how long it takes
-        objOut.flush()
-        callWithTiming {
-          fos.getFD.sync()
+      Utils.tryWithSafeFinally {
+        if (syncWrites) {
+          // Force outstanding writes to disk and track how long it takes
+          objOut.flush()
+          callWithTiming {
+            fos.getFD.sync()
+          }
         }
+      } {
+        objOut.close()
       }
-      objOut.close()
 
       channel = null
       bs = null

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 61ef5ff..4b232ae 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -46,10 +46,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
     val channel = new FileOutputStream(file).getChannel
-    while (bytes.remaining > 0) {
-      channel.write(bytes)
+    Utils.tryWithSafeFinally {
+      while (bytes.remaining > 0) {
+        channel.write(bytes)
+      }
+    } {
+      channel.close()
     }
-    channel.close()
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
       file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
@@ -75,9 +78,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
     val file = diskManager.getFile(blockId)
     val outputStream = new FileOutputStream(file)
     try {
-      try {
+      Utils.tryWithSafeFinally {
         blockManager.dataSerializeStream(blockId, outputStream, values)
-      } finally {
+      } {
         // Close outputStream here because it should be closed before file is deleted.
         outputStream.close()
       }
@@ -106,8 +109,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
 
   private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
     val channel = new RandomAccessFile(file, "r").getChannel
-
-    try {
+    Utils.tryWithSafeFinally {
       // For small files, directly read rather than memory map
       if (length < minMemoryMapBytes) {
         val buf = ByteBuffer.allocate(length.toInt)
@@ -123,7 +125,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
       } else {
         Some(channel.map(MapMode.READ_ONLY, offset, length))
       }
-    } finally {
+    } {
       channel.close()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bb8bd10..7c85e28 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -313,7 +313,7 @@ private[spark] object Utils extends Logging {
                  transferToEnabled: Boolean = false): Long =
   {
     var count = 0L
-    try {
+    tryWithSafeFinally {
       if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
         && transferToEnabled) {
         // When both streams are File stream, use transferTo to improve copy performance.
@@ -353,7 +353,7 @@ private[spark] object Utils extends Logging {
         }
       }
       count
-    } finally {
+    } {
       if (closeStreams) {
         try {
           in.close()
@@ -1214,6 +1214,44 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  /**
+   * Execute a block of code, then a finally block, but if exceptions happen in
+   * the finally block, do not suppress the original exception.
+   *
+   * This is primarily an issue with `finally { out.close() }` blocks, where
+   * close needs to be called to clean up `out`, but if an exception happened
+   * in `out.write`, it's likely `out` may be corrupted and `out.close` will
+   * fail as well. This would then suppress the original/likely more meaningful
+   * exception from the original `out.write` call.
+   */
+  def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
+    // It would be nice to find a method on Try that did this
+    var originalThrowable: Throwable = null
+    try {
+      block
+    } catch {
+      case t: Throwable =>
+        // Purposefully not using NonFatal, because even fatal exceptions
+        // we don't want to have our finallyBlock suppress
+        originalThrowable = t
+        throw originalThrowable
+    } finally {
+      try {
+        finallyBlock
+      } catch {
+        case t: Throwable =>
+          if (originalThrowable != null) {
+            // We could do originalThrowable.addSuppressed(t), but it's
+            // not available in JDK 1.6.
+            logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
+            throw originalThrowable
+          } else {
+            throw t
+          }
+      }
+    }
+  }
+
   /** Default filtering function for finding call sites using `getCallSite`. */
   private def coreExclusionFunction(className: String): Boolean = {
     // A regular expression to match classes of the "core" Spark API that we want to skip when
@@ -2074,7 +2112,7 @@ private[spark] class RedirectThread(
   override def run() {
     scala.util.control.Exception.ignoring(classOf[IOException]) {
       // FIXME: We copy the stream on the level of bytes to avoid encoding problems.
-      try {
+      Utils.tryWithSafeFinally {
         val buf = new Array[Byte](1024)
         var len = in.read(buf)
         while (len != -1) {
@@ -2082,7 +2120,7 @@ private[spark] class RedirectThread(
           out.flush()
           len = in.read(buf)
         }
-      } finally {
+      } {
         if (propagateEof) {
           out.close()
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0d884f0/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 7bd3c78..035f376 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -728,25 +728,19 @@ private[spark] class ExternalSorter[K, V, C](
       // this simple we spill out the current in-memory collection so that everything is in files.
       spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
       partitionWriters.foreach(_.commitAndClose())
-      var out: FileOutputStream = null
-      var in: FileInputStream = null
+      val out = new FileOutputStream(outputFile, true)
       val writeStartTime = System.nanoTime
-      try {
-        out = new FileOutputStream(outputFile, true)
+      util.Utils.tryWithSafeFinally {
         for (i <- 0 until numPartitions) {
-          in = new FileInputStream(partitionWriters(i).fileSegment().file)
-          val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
-          in.close()
-          in = null
-          lengths(i) = size
-        }
-      } finally {
-        if (out != null) {
-          out.close()
-        }
-        if (in != null) {
-          in.close()
+          val in = new FileInputStream(partitionWriters(i).fileSegment().file)
+          util.Utils.tryWithSafeFinally {
+            lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
+          } {
+            in.close()
+          }
         }
+      } {
+        out.close()
         context.taskMetrics.shuffleWriteMetrics.foreach(
           _.incShuffleWriteTime(System.nanoTime - writeStartTime))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org