You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Archit Thakur <ar...@gmail.com> on 2014/01/27 15:44:16 UTC

Problems while moving from 0.8.0 to 0.8.1

Hi,

Implementation of aggregation logic has been changed with 0.8.1
(Aggregator.scala)

It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
release.

Aggregator.scala
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
C)] = {
    val combiners = new AppendOnlyMap[K, C]
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    while (iter.hasNext) {
      kv = iter.next()
      combiners.changeValue(kv._1, update)
    }
    combiners.iterator
  }

I am facing problem that in changeValue function of AppendOnlyMap, it
computes,
val curKey = data(2 * pos)
which is coming as null and eventually giving NPE.

AppendOnlyMap.scala
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = updateFunc(haveNullValue, nullValue)
      haveNullValue = true
      return nullValue
    }
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (k.eq(curKey) || k.equals(curKey)) {
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else if (curKey.eq(null)) {
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        incrementSize()
        return newValue
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }


Other info:
1. My code works fine with 0.8.0.
2. I used groupByKey transformation.
3. I replaces the Aggregator.scala with the older version(0.8.0), compiled
it, Restarted Master and Worker, It ran successfully.

Thanks and Regards,
Archit Thakur.

Re: Problems while moving from 0.8.0 to 0.8.1

Posted by Aaron Davidson <il...@gmail.com>.
The code would not call null.equals(b); only a.equals(null) is allowable
(we check for k.eq(null) at the top).

a.equals(null) returning false is part of the Object.equals()
contract<http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#equals(java.lang.Object)>,
so it is not invalid to use it, it just may be unintuitive if you expect
never to have null references of MyCustomKeyType hanging around.


On Mon, Jan 27, 2014 at 12:12 PM, Archit Thakur
<ar...@gmail.com>wrote:

> On Tue, Jan 28, 2014 at 1:25 AM, Aaron Davidson <il...@gmail.com>
> wrote:
>
> > Looks like your MyCustomKeyType.equals() method doesn't correctly handle
> a
> > null argument. In general, the contract of equals is to return false if
> > called with a null argument, which this code currently relies on.
> >
> >
> Ok. But what if it is tried to call with null object rather than passing
> null.
> Code would be internally doing a.equals(b) somewhere where b was null. What
> if a happens to be null. Wont it give NPE there itself. Or Does it have a
> null check there. I mean why would null be created for MyCustomKeyType,
> when it should have some data.
> How could MyCustomKeyType be null when I never filled null anywhere.
>
>
> > This could still be patched for the sake of backwards compatibility with
> > similarly incomplete equals() methods which previously worked.
> >
> >
> >
> Agreed.
>
> Thanks and Regards,
> Archit Thakur.
>
>
>
> > On Mon, Jan 27, 2014 at 11:34 AM, Archit Thakur
> > <ar...@gmail.com>wrote:
> >
> > > ERROR executor.Executor: Exception in task ID 20
> > > java.lang.NullPointerException
> > >         at
> > >
> > >
> >
> com.xyz.spark.common.collection.MyCustomKeyType.equals(MyCustomKeyType.java:200)
> > >         at
> > >
> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:122)
> > >         at
> > > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
> > >         at
> > >
> > >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103)
> > >         at
> > >
> > >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102)
> > >         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
> > >         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
> > >         at
> > >
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
> > >         at
> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> > >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> > >         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> > >         at
> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> > >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> > >         at
> > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
> > >         at org.apache.spark.scheduler.Task.run(Task.scala:53)
> > >         at
> > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
> > >         at
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> > >         at
> > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
> > >         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> > > Source)
> > >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> > > Source)
> > >         at java.lang.Thread.run(Unknown Source)
> > >
> > >
> > >
> > > On Mon, Jan 27, 2014 at 10:54 PM, Reynold Xin <rx...@databricks.com>
> > wrote:
> > >
> > > > Do you mind pasting the whole stack trace for the NPE?
> > > >
> > > >
> > > >
> > > > On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <
> > > archit279thakur@gmail.com
> > > > >wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Implementation of aggregation logic has been changed with 0.8.1
> > > > > (Aggregator.scala)
> > > > >
> > > > > It is now using AppendOnlyMap as compared to java.util.HashMap in
> > 0.8.0
> > > > > release.
> > > > >
> > > > > Aggregator.scala
> > > > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> > > > Iterator[(K,
> > > > > C)] = {
> > > > >     val combiners = new AppendOnlyMap[K, C]
> > > > >     var kv: Product2[K, V] = null
> > > > >     val update = (hadValue: Boolean, oldValue: C) => {
> > > > >       if (hadValue) mergeValue(oldValue, kv._2) else
> > > > createCombiner(kv._2)
> > > > >     }
> > > > >     while (iter.hasNext) {
> > > > >       kv = iter.next()
> > > > >       combiners.changeValue(kv._1, update)
> > > > >     }
> > > > >     combiners.iterator
> > > > >   }
> > > > >
> > > > > I am facing problem that in changeValue function of AppendOnlyMap,
> it
> > > > > computes,
> > > > > val curKey = data(2 * pos)
> > > > > which is coming as null and eventually giving NPE.
> > > > >
> > > > > AppendOnlyMap.scala
> > > > > def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
> > > > >     val k = key.asInstanceOf[AnyRef]
> > > > >     if (k.eq(null)) {
> > > > >       if (!haveNullValue) {
> > > > >         incrementSize()
> > > > >       }
> > > > >       nullValue = updateFunc(haveNullValue, nullValue)
> > > > >       haveNullValue = true
> > > > >       return nullValue
> > > > >     }
> > > > >     var pos = rehash(k.hashCode) & mask
> > > > >     var i = 1
> > > > >     while (true) {
> > > > >       val curKey = data(2 * pos)
> > > > >       if (k.eq(curKey) || k.equals(curKey)) {
> > > > >         val newValue = updateFunc(true, data(2 * pos +
> > > > 1).asInstanceOf[V])
> > > > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > > > >         return newValue
> > > > >       } else if (curKey.eq(null)) {
> > > > >         val newValue = updateFunc(false, null.asInstanceOf[V])
> > > > >         data(2 * pos) = k
> > > > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > > > >         incrementSize()
> > > > >         return newValue
> > > > >       } else {
> > > > >         val delta = i
> > > > >         pos = (pos + delta) & mask
> > > > >         i += 1
> > > > >       }
> > > > >     }
> > > > >     null.asInstanceOf[V] // Never reached but needed to keep
> compiler
> > > > happy
> > > > >   }
> > > > >
> > > > >
> > > > > Other info:
> > > > > 1. My code works fine with 0.8.0.
> > > > > 2. I used groupByKey transformation.
> > > > > 3. I replaces the Aggregator.scala with the older version(0.8.0),
> > > > compiled
> > > > > it, Restarted Master and Worker, It ran successfully.
> > > > >
> > > > > Thanks and Regards,
> > > > > Archit Thakur.
> > > > >
> > > >
> > >
> >
>

Re: Problems while moving from 0.8.0 to 0.8.1

Posted by Archit Thakur <ar...@gmail.com>.
On Tue, Jan 28, 2014 at 1:25 AM, Aaron Davidson <il...@gmail.com> wrote:

> Looks like your MyCustomKeyType.equals() method doesn't correctly handle a
> null argument. In general, the contract of equals is to return false if
> called with a null argument, which this code currently relies on.
>
>
Ok. But what if it is tried to call with null object rather than passing
null.
Code would be internally doing a.equals(b) somewhere where b was null. What
if a happens to be null. Wont it give NPE there itself. Or Does it have a
null check there. I mean why would null be created for MyCustomKeyType,
when it should have some data.
How could MyCustomKeyType be null when I never filled null anywhere.


> This could still be patched for the sake of backwards compatibility with
> similarly incomplete equals() methods which previously worked.
>
>
>
Agreed.

Thanks and Regards,
Archit Thakur.



> On Mon, Jan 27, 2014 at 11:34 AM, Archit Thakur
> <ar...@gmail.com>wrote:
>
> > ERROR executor.Executor: Exception in task ID 20
> > java.lang.NullPointerException
> >         at
> >
> >
> com.xyz.spark.common.collection.MyCustomKeyType.equals(MyCustomKeyType.java:200)
> >         at
> > org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:122)
> >         at
> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
> >         at
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103)
> >         at
> >
> >
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102)
> >         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
> >         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
> >         at
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
> >         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> >         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> >         at
> > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
> >         at org.apache.spark.scheduler.Task.run(Task.scala:53)
> >         at
> >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
> >         at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
> >         at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
> >         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> > Source)
> >         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> > Source)
> >         at java.lang.Thread.run(Unknown Source)
> >
> >
> >
> > On Mon, Jan 27, 2014 at 10:54 PM, Reynold Xin <rx...@databricks.com>
> wrote:
> >
> > > Do you mind pasting the whole stack trace for the NPE?
> > >
> > >
> > >
> > > On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <
> > archit279thakur@gmail.com
> > > >wrote:
> > >
> > > > Hi,
> > > >
> > > > Implementation of aggregation logic has been changed with 0.8.1
> > > > (Aggregator.scala)
> > > >
> > > > It is now using AppendOnlyMap as compared to java.util.HashMap in
> 0.8.0
> > > > release.
> > > >
> > > > Aggregator.scala
> > > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> > > Iterator[(K,
> > > > C)] = {
> > > >     val combiners = new AppendOnlyMap[K, C]
> > > >     var kv: Product2[K, V] = null
> > > >     val update = (hadValue: Boolean, oldValue: C) => {
> > > >       if (hadValue) mergeValue(oldValue, kv._2) else
> > > createCombiner(kv._2)
> > > >     }
> > > >     while (iter.hasNext) {
> > > >       kv = iter.next()
> > > >       combiners.changeValue(kv._1, update)
> > > >     }
> > > >     combiners.iterator
> > > >   }
> > > >
> > > > I am facing problem that in changeValue function of AppendOnlyMap, it
> > > > computes,
> > > > val curKey = data(2 * pos)
> > > > which is coming as null and eventually giving NPE.
> > > >
> > > > AppendOnlyMap.scala
> > > > def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
> > > >     val k = key.asInstanceOf[AnyRef]
> > > >     if (k.eq(null)) {
> > > >       if (!haveNullValue) {
> > > >         incrementSize()
> > > >       }
> > > >       nullValue = updateFunc(haveNullValue, nullValue)
> > > >       haveNullValue = true
> > > >       return nullValue
> > > >     }
> > > >     var pos = rehash(k.hashCode) & mask
> > > >     var i = 1
> > > >     while (true) {
> > > >       val curKey = data(2 * pos)
> > > >       if (k.eq(curKey) || k.equals(curKey)) {
> > > >         val newValue = updateFunc(true, data(2 * pos +
> > > 1).asInstanceOf[V])
> > > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > > >         return newValue
> > > >       } else if (curKey.eq(null)) {
> > > >         val newValue = updateFunc(false, null.asInstanceOf[V])
> > > >         data(2 * pos) = k
> > > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > > >         incrementSize()
> > > >         return newValue
> > > >       } else {
> > > >         val delta = i
> > > >         pos = (pos + delta) & mask
> > > >         i += 1
> > > >       }
> > > >     }
> > > >     null.asInstanceOf[V] // Never reached but needed to keep compiler
> > > happy
> > > >   }
> > > >
> > > >
> > > > Other info:
> > > > 1. My code works fine with 0.8.0.
> > > > 2. I used groupByKey transformation.
> > > > 3. I replaces the Aggregator.scala with the older version(0.8.0),
> > > compiled
> > > > it, Restarted Master and Worker, It ran successfully.
> > > >
> > > > Thanks and Regards,
> > > > Archit Thakur.
> > > >
> > >
> >
>

Re: Problems while moving from 0.8.0 to 0.8.1

Posted by Aaron Davidson <il...@gmail.com>.
Looks like your MyCustomKeyType.equals() method doesn't correctly handle a
null argument. In general, the contract of equals is to return false if
called with a null argument, which this code currently relies on.

This could still be patched for the sake of backwards compatibility with
similarly incomplete equals() methods which previously worked.


On Mon, Jan 27, 2014 at 11:34 AM, Archit Thakur
<ar...@gmail.com>wrote:

> ERROR executor.Executor: Exception in task ID 20
> java.lang.NullPointerException
>         at
>
> com.xyz.spark.common.collection.MyCustomKeyType.equals(MyCustomKeyType.java:200)
>         at
> org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:122)
>         at
> org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
>         at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103)
>         at
>
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102)
>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
>         at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>         at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>
>
> On Mon, Jan 27, 2014 at 10:54 PM, Reynold Xin <rx...@databricks.com> wrote:
>
> > Do you mind pasting the whole stack trace for the NPE?
> >
> >
> >
> > On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <
> archit279thakur@gmail.com
> > >wrote:
> >
> > > Hi,
> > >
> > > Implementation of aggregation logic has been changed with 0.8.1
> > > (Aggregator.scala)
> > >
> > > It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
> > > release.
> > >
> > > Aggregator.scala
> > > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> > Iterator[(K,
> > > C)] = {
> > >     val combiners = new AppendOnlyMap[K, C]
> > >     var kv: Product2[K, V] = null
> > >     val update = (hadValue: Boolean, oldValue: C) => {
> > >       if (hadValue) mergeValue(oldValue, kv._2) else
> > createCombiner(kv._2)
> > >     }
> > >     while (iter.hasNext) {
> > >       kv = iter.next()
> > >       combiners.changeValue(kv._1, update)
> > >     }
> > >     combiners.iterator
> > >   }
> > >
> > > I am facing problem that in changeValue function of AppendOnlyMap, it
> > > computes,
> > > val curKey = data(2 * pos)
> > > which is coming as null and eventually giving NPE.
> > >
> > > AppendOnlyMap.scala
> > > def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
> > >     val k = key.asInstanceOf[AnyRef]
> > >     if (k.eq(null)) {
> > >       if (!haveNullValue) {
> > >         incrementSize()
> > >       }
> > >       nullValue = updateFunc(haveNullValue, nullValue)
> > >       haveNullValue = true
> > >       return nullValue
> > >     }
> > >     var pos = rehash(k.hashCode) & mask
> > >     var i = 1
> > >     while (true) {
> > >       val curKey = data(2 * pos)
> > >       if (k.eq(curKey) || k.equals(curKey)) {
> > >         val newValue = updateFunc(true, data(2 * pos +
> > 1).asInstanceOf[V])
> > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > >         return newValue
> > >       } else if (curKey.eq(null)) {
> > >         val newValue = updateFunc(false, null.asInstanceOf[V])
> > >         data(2 * pos) = k
> > >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> > >         incrementSize()
> > >         return newValue
> > >       } else {
> > >         val delta = i
> > >         pos = (pos + delta) & mask
> > >         i += 1
> > >       }
> > >     }
> > >     null.asInstanceOf[V] // Never reached but needed to keep compiler
> > happy
> > >   }
> > >
> > >
> > > Other info:
> > > 1. My code works fine with 0.8.0.
> > > 2. I used groupByKey transformation.
> > > 3. I replaces the Aggregator.scala with the older version(0.8.0),
> > compiled
> > > it, Restarted Master and Worker, It ran successfully.
> > >
> > > Thanks and Regards,
> > > Archit Thakur.
> > >
> >
>

Re: Problems while moving from 0.8.0 to 0.8.1

Posted by Archit Thakur <ar...@gmail.com>.
ERROR executor.Executor: Exception in task ID 20
java.lang.NullPointerException
        at
com.xyz.spark.common.collection.MyCustomKeyType.equals(MyCustomKeyType.java:200)
        at
org.apache.spark.util.AppendOnlyMap.changeValue(AppendOnlyMap.scala:122)
        at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:42)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:103)
        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$3.apply(PairRDDFunctions.scala:102)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
        at org.apache.spark.rdd.RDD$$anonfun$3.apply(RDD.scala:465)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
        at java.lang.Thread.run(Unknown Source)



On Mon, Jan 27, 2014 at 10:54 PM, Reynold Xin <rx...@databricks.com> wrote:

> Do you mind pasting the whole stack trace for the NPE?
>
>
>
> On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <archit279thakur@gmail.com
> >wrote:
>
> > Hi,
> >
> > Implementation of aggregation logic has been changed with 0.8.1
> > (Aggregator.scala)
> >
> > It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
> > release.
> >
> > Aggregator.scala
> > def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) :
> Iterator[(K,
> > C)] = {
> >     val combiners = new AppendOnlyMap[K, C]
> >     var kv: Product2[K, V] = null
> >     val update = (hadValue: Boolean, oldValue: C) => {
> >       if (hadValue) mergeValue(oldValue, kv._2) else
> createCombiner(kv._2)
> >     }
> >     while (iter.hasNext) {
> >       kv = iter.next()
> >       combiners.changeValue(kv._1, update)
> >     }
> >     combiners.iterator
> >   }
> >
> > I am facing problem that in changeValue function of AppendOnlyMap, it
> > computes,
> > val curKey = data(2 * pos)
> > which is coming as null and eventually giving NPE.
> >
> > AppendOnlyMap.scala
> > def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
> >     val k = key.asInstanceOf[AnyRef]
> >     if (k.eq(null)) {
> >       if (!haveNullValue) {
> >         incrementSize()
> >       }
> >       nullValue = updateFunc(haveNullValue, nullValue)
> >       haveNullValue = true
> >       return nullValue
> >     }
> >     var pos = rehash(k.hashCode) & mask
> >     var i = 1
> >     while (true) {
> >       val curKey = data(2 * pos)
> >       if (k.eq(curKey) || k.equals(curKey)) {
> >         val newValue = updateFunc(true, data(2 * pos +
> 1).asInstanceOf[V])
> >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> >         return newValue
> >       } else if (curKey.eq(null)) {
> >         val newValue = updateFunc(false, null.asInstanceOf[V])
> >         data(2 * pos) = k
> >         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
> >         incrementSize()
> >         return newValue
> >       } else {
> >         val delta = i
> >         pos = (pos + delta) & mask
> >         i += 1
> >       }
> >     }
> >     null.asInstanceOf[V] // Never reached but needed to keep compiler
> happy
> >   }
> >
> >
> > Other info:
> > 1. My code works fine with 0.8.0.
> > 2. I used groupByKey transformation.
> > 3. I replaces the Aggregator.scala with the older version(0.8.0),
> compiled
> > it, Restarted Master and Worker, It ran successfully.
> >
> > Thanks and Regards,
> > Archit Thakur.
> >
>

Re: Problems while moving from 0.8.0 to 0.8.1

Posted by Reynold Xin <rx...@databricks.com>.
Do you mind pasting the whole stack trace for the NPE?



On Mon, Jan 27, 2014 at 6:44 AM, Archit Thakur <ar...@gmail.com>wrote:

> Hi,
>
> Implementation of aggregation logic has been changed with 0.8.1
> (Aggregator.scala)
>
> It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
> release.
>
> Aggregator.scala
> def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
> C)] = {
>     val combiners = new AppendOnlyMap[K, C]
>     var kv: Product2[K, V] = null
>     val update = (hadValue: Boolean, oldValue: C) => {
>       if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
>     }
>     while (iter.hasNext) {
>       kv = iter.next()
>       combiners.changeValue(kv._1, update)
>     }
>     combiners.iterator
>   }
>
> I am facing problem that in changeValue function of AppendOnlyMap, it
> computes,
> val curKey = data(2 * pos)
> which is coming as null and eventually giving NPE.
>
> AppendOnlyMap.scala
> def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
>     val k = key.asInstanceOf[AnyRef]
>     if (k.eq(null)) {
>       if (!haveNullValue) {
>         incrementSize()
>       }
>       nullValue = updateFunc(haveNullValue, nullValue)
>       haveNullValue = true
>       return nullValue
>     }
>     var pos = rehash(k.hashCode) & mask
>     var i = 1
>     while (true) {
>       val curKey = data(2 * pos)
>       if (k.eq(curKey) || k.equals(curKey)) {
>         val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
>         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
>         return newValue
>       } else if (curKey.eq(null)) {
>         val newValue = updateFunc(false, null.asInstanceOf[V])
>         data(2 * pos) = k
>         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
>         incrementSize()
>         return newValue
>       } else {
>         val delta = i
>         pos = (pos + delta) & mask
>         i += 1
>       }
>     }
>     null.asInstanceOf[V] // Never reached but needed to keep compiler happy
>   }
>
>
> Other info:
> 1. My code works fine with 0.8.0.
> 2. I used groupByKey transformation.
> 3. I replaces the Aggregator.scala with the older version(0.8.0), compiled
> it, Restarted Master and Worker, It ran successfully.
>
> Thanks and Regards,
> Archit Thakur.
>