You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/12/22 07:10:22 UTC

spark git commit: [SPARK-2075][Core] Make the compiler generate same bytes code for Hadoop 1.+ and Hadoop 2.+

Repository: spark
Updated Branches:
  refs/heads/master c6a3c0d50 -> 6ee6aa70b


[SPARK-2075][Core] Make the compiler generate same bytes code for Hadoop 1.+ and Hadoop 2.+

`NullWritable` is a `Comparable` rather than `Comparable[NullWritable]` in Hadoop 1.+, so the compiler cannot find an implicit Ordering for it. It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler will generate same codes.

I used the following commands to confirm the generated byte codes are some.
```
mvn -Dhadoop.version=1.2.1 -DskipTests clean package -pl core -am
javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop1.txt

mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package -pl core -am
javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop2.txt

diff ~/hadoop1.txt ~/hadoop2.txt
```

However, the compiler will generate different codes for the classes which call methods of `JobContext/TaskAttemptContext`. `JobContext/TaskAttemptContext` is a class in Hadoop 1.+, and calling its method will use `invokevirtual`, while it's an interface in Hadoop 2.+, and will use `invokeinterface`.

To fix it, we can use reflection to call `JobContext/TaskAttemptContext.getConfiguration`.

Author: zsxwing <zs...@gmail.com>

Closes #3740 from zsxwing/SPARK-2075 and squashes the following commits:

39d9df2 [zsxwing] Fix the code style
e4ad8b5 [zsxwing] Use null for the implicit Ordering
734bac9 [zsxwing] Explicitly set the implicit parameters
ca03559 [zsxwing] Use reflection to access JobContext/TaskAttemptContext.getConfiguration
fa40db0 [zsxwing] Add an Ordering for NullWritable to make the compiler generate same byte codes for RDD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ee6aa70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ee6aa70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ee6aa70

Branch: refs/heads/master
Commit: 6ee6aa70b7d52408cc66bd1434cbeae3212e3f01
Parents: c6a3c0d
Author: zsxwing <zs...@gmail.com>
Authored: Sun Dec 21 22:10:19 2014 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Dec 21 22:10:19 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 12 +++++++++++
 .../input/FixedLengthBinaryInputFormat.scala    |  3 ++-
 .../input/FixedLengthBinaryRecordReader.scala   |  3 ++-
 .../apache/spark/input/PortableDataStream.scala |  4 +++-
 .../spark/input/WholeTextFileRecordReader.scala |  4 +++-
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 21 ++++++++++++++++++--
 6 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ee6aa70/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 60ee115..57f9faf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.FileSystem.Statistics
 import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 
@@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging {
       Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
     statisticsDataClass.getDeclaredMethod(methodName)
   }
+
+  /**
+   * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
+   * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
+   * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
+   * while it's interface in Hadoop 2.+.
+   */
+  def getConfigurationFromJobContext(context: JobContext): Configuration = {
+    val method = context.getClass.getMethod("getConfiguration")
+    method.invoke(context).asInstanceOf[Configuration]
+  }
 }
 
 object SparkHadoopUtil {

http://git-wip-us.apache.org/repos/asf/spark/blob/6ee6aa70/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
index 89b29af..c219d21 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * Custom Input Format for reading and splitting flat binary files that contain records,
@@ -33,7 +34,7 @@ private[spark] object FixedLengthBinaryInputFormat {
 
   /** Retrieves the record length property from a Hadoop configuration */
   def getRecordLength(context: JobContext): Int = {
-    context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
+    SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ee6aa70/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
index 36a1e5d..67a9692 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
 import org.apache.hadoop.io.{BytesWritable, LongWritable}
 import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
@@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader
     // the actual file we will be reading from
     val file = fileSplit.getPath
     // job configuration
-    val job = context.getConfiguration
+    val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
     // check compression
     val codec = new CompressionCodecFactory(job).getCodec(file)
     if (codec != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6ee6aa70/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 4574725..593a62b 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
 
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.deploy.SparkHadoopUtil
 
 /**
  * A general format for reading whole files in as streams, byte arrays,
@@ -145,7 +146,8 @@ class PortableDataStream(
 
   private val confBytes = {
     val baos = new ByteArrayOutputStream()
-    context.getConfiguration.write(new DataOutputStream(baos))
+    SparkHadoopUtil.get.getConfigurationFromJobContext(context).
+      write(new DataOutputStream(baos))
     baos.toByteArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6ee6aa70/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 1b1131b..31bde8a 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
 import org.apache.hadoop.mapreduce.RecordReader
 import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.deploy.SparkHadoopUtil
 
 
 /**
@@ -51,7 +52,8 @@ private[spark] class WholeTextFileRecordReader(
   extends RecordReader[String, String] with Configurable {
 
   private[this] val path = split.getPath(index)
-  private[this] val fs = path.getFileSystem(context.getConfiguration)
+  private[this] val fs = path.getFileSystem(
+    SparkHadoopUtil.get.getConfigurationFromJobContext(context))
 
   // True means the current file has been processed, then skip it.
   private[this] var processed = false

http://git-wip-us.apache.org/repos/asf/spark/blob/6ee6aa70/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 214f22b..a942069 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1174,7 +1174,20 @@ abstract class RDD[T: ClassTag](
    * Save this RDD as a text file, using string representations of elements.
    */
   def saveAsTextFile(path: String) {
-    this.map(x => (NullWritable.get(), new Text(x.toString)))
+    // https://issues.apache.org/jira/browse/SPARK-2075
+    //
+    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
+    // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
+    // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
+    // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
+    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
+    //
+    // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
+    // same bytecodes for `saveAsTextFile`.
+    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
+    val textClassTag = implicitly[ClassTag[Text]]
+    val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
+    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
       .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
   }
 
@@ -1182,7 +1195,11 @@ abstract class RDD[T: ClassTag](
    * Save this RDD as a compressed text file, using string representations of elements.
    */
   def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
-    this.map(x => (NullWritable.get(), new Text(x.toString)))
+    // https://issues.apache.org/jira/browse/SPARK-2075
+    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
+    val textClassTag = implicitly[ClassTag[Text]]
+    val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
+    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
       .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org