You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by zakhavan <za...@unm.edu> on 2018/10/09 19:46:10 UTC

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Hello,

I'm trying to calculate the Pearson correlation between two DStreams using
sliding window in Pyspark. But I keep getting the following error:

Traceback (most recent call last):
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/examples/src/main/python/streaming/Cross-Corr.py",
line 63, in <module>
    result = Statistics.corr(windowedds1,windowedds2, method="pearson")
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/stat/_statistics.py",
line 157, in corr
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 130, in callMLlibFunc
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 122, in callJavaFunc
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/mllib/common.py",
line 87, in _py2java
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",
line 555, in dumps
  File
"/home/zeinab/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/context.py",
line 315, in __getnewargs__
Exception: It appears that you are attempting to reference SparkContext from
a broadcast variable, action, or transformation. SparkContext can only be
used on the driver, not in code that it run on workers. For more
information, see SPARK-5063.

The error comes from these this line:

result = Statistics.corr(windowedds1,windowedds2, method="pearson")

First, I read the lines from 2 text files and load them into two Kafka
topics and then apply the window operation on each DStream and calculate
Pearson correlation between them.

Here is my code:

from __future__ import print_function
from future.builtins import *
from pyspark.ml.linalg import Vectors
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import time
from collections import deque
import sys
from operator import add
import numpy as np
from itertools import chain
import warnings
from obspy import UTCDateTime
from obspy.signal.cross_correlation import templates_max_similarity
from obspy import read

if __name__ == "__main__":
    print("hello spark")

    sc = SparkContext("local[2]", appName="CrossCorrelation")
    ssc = StreamingContext(sc, 5)
    broker, topic1, topic2 = sys.argv[1:]
    # Connect to Kafka

    kvs1 = KafkaUtils.createStream(ssc, broker,
"real-time-cross-correlation",{topic1:1})
    kvs2 = KafkaUtils.createStream(ssc, broker,
"real-time-cross-correlation",{topic2:1})
    lines1 = kvs1.map(lambda x1: x1[1])
    ds1 = lines1.flatMap(lambda line1: line1.strip().split("\n")).map(lambda
strelem1: float(strelem1))
    lines2 = kvs2.map(lambda x2: x2[1])
    ds2 = lines2.flatMap(lambda line2: line2.strip().split("\n")).map(lambda
strelem2: float(strelem2))
    #Windowing
    windowedds1= ds1.window(10,5)
    windowedds2= ds2.window(10,5)
    #Correlation
    result = Statistics.corr(windowedds1,windowedds2, method="pearson")
    if result > 0.7:
        print("ds1 and ds2 are correlated!!!")

    ssc.start()
    ssc.awaitTermination()

Does anybody know what I'm doing wrong?

Thank you.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org