You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2015/12/04 01:37:07 UTC

spark git commit: [SPARK-12056][CORE] Create a TaskAttemptContext only after calling setConf.

Repository: spark
Updated Branches:
  refs/heads/master 2213441e5 -> f434f36d5


[SPARK-12056][CORE] Create a TaskAttemptContext only after calling setConf.

TaskAttemptContext's constructor will clone the configuration instead of referencing it. Calling setConf after creating TaskAttemptContext makes any changes to the configuration made inside setConf unperceived by RecordReader instances.

As an example, Titan's InputFormat will change conf when calling setConf. They wrap their InputFormat around Cassandra's ColumnFamilyInputFormat, and append Cassandra's configuration. This change fixes the following error when using Titan's CassandraInputFormat with Spark:

*java.lang.RuntimeException: org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_key space_args(keyspace:null)*

There's a discussion of this error here: https://groups.google.com/forum/#!topic/aureliusgraphs/4zpwyrYbGAE

Author: Anderson de Andrade <ad...@verticalscope.com>

Closes #10046 from adeandrade/newhadooprdd-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f434f36d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f434f36d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f434f36d

Branch: refs/heads/master
Commit: f434f36d508eb4dcade70871611fc022ae0feb56
Parents: 2213441
Author: Anderson de Andrade <ad...@verticalscope.com>
Authored: Thu Dec 3 16:37:00 2015 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Dec 3 16:37:00 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f434f36d/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index d196099..86f38ae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -138,14 +138,14 @@ class NewHadoopRDD[K, V](
       }
       inputMetrics.setBytesReadCallback(bytesReadCallback)
 
-      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
-      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
       val format = inputFormatClass.newInstance
       format match {
         case configurable: Configurable =>
           configurable.setConf(conf)
         case _ =>
       }
+      val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
+      val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
       private var reader = format.createRecordReader(
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org