You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gourav Sengupta <go...@gmail.com> on 2017/07/31 00:14:21 UTC

SPARK Issue in Standalone cluster

Hi,

I am working by creating a native SPARK standalone cluster (
https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK
dataframe and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no
permission issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
computers)

After that I can see in the spark UI (at port 8080) two workers.


CODE:
Then I run the following code:

======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master("spark://mastersystem.local:7077")
        .appName("gouravtest")
        .enableHiveSupport()
        .getOrCreate())
import pandas, numpy
testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
4), columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test2").count()
======================================================


ERROR I (in above code):
ERROR in line:
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
This line does not fail or report any error. But when I am looking at the
stage in spark Application UI the error reported for one of the slave node
which is not in the same system as the master node is mentioned below. The
writing on the slave node which is in the same physical system as the
Master happens correctly. (NOTE: slave node basically the worker and master
node the driver)
----------------------------------------------------------------------------------------------------------------------------------

0 (TID 41). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000006_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000006_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID
64). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000028_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000028_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000021_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000021_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID
45). 2103 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID
37). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID
39). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000018_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000018_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000029_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000029_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000027_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000027_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID
54). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000010_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID
52). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000010_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000030_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID
55). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000030_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID
53). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID
61). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000016_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000016_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID
59). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID
51). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000024_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000024_0: Committed
17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task
'attempt_20170731001928_0002_m_000023_0' to
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
17/07/31 00:19:29 INFO SparkHadoopMapRedUtil:
attempt_20170731001928_0002_m_000023_0: Committed
17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID
62). 2103 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID
43). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID
49). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID
60). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID
63). 2103 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID
56). 2060 bytes result sent to driver
17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID
57). 2060 bytes result sent to driver
17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as
bytes in memory (estimated size 24.9 KB, free 365.9 MB)
17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values
in memory (estimated size 70.3 KB, free 365.9 MB)
17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
java.io.FileNotFoundException: File
file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
	at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
	at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
	at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
	at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
	at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
	at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
	at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
	at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
	at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
	at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66


----------------------------------------------------------------------------------------------------------------------------------


ERROR II  (in above code):
While trying to read the file there is now a distinct error thrown which
mentions the same saying that the files do not exist.

Also why is SPARK trying to search for the same files in both the systems?
If the same path in two systems have different files should SPARK not
combine and work on them?



NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
I started spark using the same method but now using SPARK 1.5 and this does
not give any error:
======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark

sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
sqlContext = pyspark.SQLContext(sc)
import pandas, numpy
testdf
= sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4),
columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test3")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test3").count()
======================================================

I will be sincerely obliged if someone could kindly help me out with this
issue and point out my mistakes/ assumptions.




Regards,
Gourav Sengupta

Re: SPARK Issue in Standalone cluster

Posted by Sea aj <sa...@gmail.com>.
Hi everyone,

I have a huge dataframe with 1 billion rows and each row is a nested list.
That being said, I want to train some ML models on this df but due to the
huge size, I get out memory error on one of my nodes when I run fit
function.

currently, my configuration is:
144 cores, 16 cores for each of the 8 nodes.
100gb of ram for each slave and 100gb of ram for the driver. I set the
maxResultSize to be 20gb.

Do you have any suggestion so far?

I can think of splitting the data to multiple dataframes and then training
the model on each individually but besides the longer runtime, I learned
that fit function overwrites the previous model each time I call it. Isn't
there a way to get the fit function to train the new model with regard to
the previously trained model?

Thanks





On Sun, Aug 6, 2017 at 11:04 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi Marco,
>
> thanks a ton, I will surely use those alternatives.
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni <mm...@gmail.com>
> wrote:
>
>> Sengupta
>>  further to this, if you try the following notebook in databricks cloud,
>> it will read a .csv file , write to a parquet file and read it again (just
>> to count the number of rows stored)
>> Please note that the path to the csv file might differ for you.....
>> So, what you will need todo is
>> 1 - create an account to community.cloud.databricks.com
>> 2 - upload the .csv file onto the Data of your databricks private cluster
>> 3  - run the script. that will store the data on the distrubuted
>> filesystem of the databricks cloudn (dbfs)
>>
>> It's worth investing in this free databricks cloud as it can create a
>> cluster for you with minimal effort, and it's  a very easy way to test your
>> spark scripts on a real cluster
>>
>> hope this helps
>> kr
>>
>> ##################################
>> from pyspark.sql import SQLContext
>>
>> from random import randint
>> from time import sleep
>> from pyspark.sql.session import SparkSession
>> import logging
>> logger = logging.getLogger(__name__)
>> logger.setLevel(logging.INFO)
>> ch = logging.StreamHandler()
>> logger.addHandler(ch)
>>
>>
>> import sys
>>
>> def read_parquet_file(parquetFileName):
>>   logger.info('Reading now the parquet files we just created...:%s',
>> parquetFileName)
>>   parquet_data = sqlContext.read.parquet(parquetFileName)
>>   logger.info('Parquet file has %s', parquet_data.count())
>>
>> def dataprocessing(filePath, count, sqlContext):
>>     logger.info( 'Iter count is:%s' , count)
>>     if count == 0:
>>         print 'exiting'
>>     else:
>>         df_traffic_tmp = sqlContext.read.format("csv").
>> option("header",'true').load(filePath)
>>         logger.info( '#############################DataSet has:%s' ,
>> df_traffic_tmp.count())
>>         logger.info('WRting to a parquet file')
>>         parquetFileName = "dbfs:/myParquetDf2.parquet"
>>         df_traffic_tmp.write.parquet(parquetFileName)
>>         sleepInterval = randint(10,100)
>>         logger.info( '#############################Sleeping for %s' ,
>> sleepInterval)
>>         sleep(sleepInterval)
>>         read_parquet_file(parquetFileName)
>>         dataprocessing(filePath, count-1, sqlContext)
>>
>> filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This
>> path might differ for you
>> iterations = 1
>> logger.info('----------------------')
>> logger.info('Filename:%s', filename)
>> logger.info('Iterations:%s', iterations )
>> logger.info('----------------------')
>>
>> logger.info ('Initializing sqlContext')
>> logger.info( '........Starting spark..........Loading from%s for %s
>> iterations' , filename, iterations)
>> logger.info(  'Starting up....')
>> sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
>> logger.info ('Initializing sqlContext')
>> sqlContext = SQLContext(sc)
>> dataprocessing(filename, iterations, sqlContext)
>> logger.info('Out of here..')
>> ######################################
>>
>>
>> On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mm...@gmail.com>
>> wrote:
>>
>>> Uh believe me there are lots of ppl on this list who will send u code
>>> snippets if u ask... 😀
>>>
>>> Yes that is what Steve pointed out, suggesting also that for that simple
>>> exercise you should perform all operations on a spark standalone instead
>>> (or alt. Use an nfs on the cluster)
>>> I'd agree with his suggestion....
>>> I suggest u another alternative:
>>> https://community.cloud.databricks.com/
>>>
>>> That's a ready made cluster and you can run your spark app as well store
>>> data on the cluster (well I haven't tried myself but I assume it's
>>> possible).   Try that out... I will try ur script there as I have an
>>> account there (though I guess I'll get there before me.....)
>>>
>>> Try that out and let me know if u get stuck....
>>> Kr
>>>
>>> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <go...@gmail.com>
>>> wrote:
>>>
>>>> Hi Marco,
>>>>
>>>> For the first time in several years FOR THE VERY FIRST TIME. I am
>>>> seeing someone actually executing code and providing response. It feel
>>>> wonderful that at least someone considered to respond back by executing
>>>> code and just did not filter out each and every technical details to brood
>>>> only on my superb social skills, while claiming the reason for ignoring
>>>> technical details is that it elementary. I think that Steve also is the
>>>> first person who could answer the WHY of an elementary question instead of
>>>> saying that is how it is and pointed out to the correct documentation.
>>>>
>>>> That code works fantastically. But the problem which I have tried to
>>>> find out is while writing out the data and not reading it.
>>>>
>>>>
>>>> So if you see try to read the data from the same folder which has the
>>>> same file across all the nodes then it will work fine. In fact that is what
>>>> should work.
>>>>
>>>> What does not work is that if you try to write back the file and then
>>>> read it once again from the location you have written that is when the
>>>> issue starts happening.
>>>>
>>>> Therefore if in my code you were to save the pandas dataframe as a CSV
>>>> file and then read it then you will find the following observations:
>>>>
>>>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>>> columns=list('ABCD'))
>>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>>> header=True, sep=",", index=0)
>>>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa
>>>> rkdata/testdir/")
>>>> testdf.cache()
>>>> testdf.count()
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>>
>>>>
>>>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN
>>>> WHICH THE DATA DOES NOT EXISTS
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>>> columns=list('ABCD'))
>>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>>> header=True, sep=",", index=0)
>>>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/sp
>>>> ark/sparkdata/testdir/")
>>>> testdf.cache()
>>>> testdf.count()
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ------------------------------------------------------------
>>>> ---------------------------
>>>>
>>>>
>>>> if you execute my code then also you will surprisingly see that the
>>>> writes in the nodes which is not the master node does not complete moving
>>>> the files from the _temporary folder to the main one.
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>>
>>>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mm...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello
>>>>>  please have a look at this. it'sa simple script that just read a
>>>>> dataframe for n time, sleeping at random interval. i used it to test memory
>>>>> issues that another user was experiencing on a spark cluster
>>>>>
>>>>> you should run it like this e.g
>>>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv>
>>>>> <num of iterations>
>>>>>
>>>>> i ran it on the cluster like this
>>>>>
>>>>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>>>>> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
>>>>> file:///root/pyscripts/tree_addhealth.csv
>>>>>
>>>>> hth, ping me back if you have issues
>>>>> i do agree with Steve's comments.... if you want to test your  spark
>>>>> script s just for playing, do it on  a standaone server on your localhost.
>>>>> Moving to a c luster is just a matter of deploying your script and mke sure
>>>>> you have a common place where to read and store the data..... SysAdmin
>>>>> should give you this when they setup the cluster...
>>>>>
>>>>> kr
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>
>>>>>> Hi Marco,
>>>>>>
>>>>>> I am sincerely obliged for your kind time and response. Can you
>>>>>> please try the solution that you have so kindly suggested?
>>>>>>
>>>>>> It will be a lot of help if you could kindly execute the code that I
>>>>>> have given. I dont think that anyone has yet.
>>>>>>
>>>>>> There are lots of fine responses to my question here, but if you read
>>>>>> the last response from Simon, it comes the closest to being satisfactory. I
>>>>>> am sure even he did not execute the code, but at least he came quite close
>>>>>> to understanding what the problem is.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mm...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello
>>>>>>>  my 2 cents here, hope it helps
>>>>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>>>>> it's an unnecessary dependency that you dont need for just running a python
>>>>>>> script
>>>>>>> Instead do the following:
>>>>>>> - got to the root of our master / slave node. create a directory
>>>>>>> /root/pyscripts
>>>>>>> - place your csv file there as well as the python script
>>>>>>> - run the script to replicate the whole directory  across the
>>>>>>> cluster (i believe it's called copy-script.sh)
>>>>>>> - then run your spark-submit , it will be something lke
>>>>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>>>>> - in your python script, as part of your processing, write the
>>>>>>> parquet file in directory /root/pyscripts
>>>>>>>
>>>>>>> If you have an AWS account and you are versatile with that - you
>>>>>>> need to setup bucket permissions etc - , you can just
>>>>>>> - store your file in one of your S3 bucket
>>>>>>> - create an EMR cluster
>>>>>>> - connect to master or slave
>>>>>>> - run your  scritp that reads from the s3 bucket and write to the
>>>>>>> same s3 bucket
>>>>>>>
>>>>>>>
>>>>>>> Feel free to mail me privately, i have a working script i have used
>>>>>>> to test some code on spark standalone cluster
>>>>>>> hth
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Steve,
>>>>>>>>
>>>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>>>>
>>>>>>>> I am now going through the documentation (
>>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much
>>>>>>>> much more sense now.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Gourav Sengupta
>>>>>>>>
>>>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>>>>> stevel@hortonworks.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <
>>>>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi Steve,
>>>>>>>>>
>>>>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>>>>>> does sound impolite in my emails, in advance.
>>>>>>>>>
>>>>>>>>> Let me first start by thanking you.
>>>>>>>>>
>>>>>>>>> I know it looks like I formed all my opinion based on that
>>>>>>>>> document, but that is not the case at all. If you or anyone tries to
>>>>>>>>> execute the code that I have given then they will see what I mean. Code
>>>>>>>>> speaks louder and better than words for me.
>>>>>>>>>
>>>>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>>>>> someone will be able to correct  a set of understanding that a moron like
>>>>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in
>>>>>>>>> HDFS with replication 2 and there is a HADOOP cluster of three nodes. All
>>>>>>>>> these nodes have SPARK workers (executors) running in them.  Both are
>>>>>>>>> stored in the following way:
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>>>>> | (master)     |                     |                    |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>>>>> -----------------------------------------------------
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>>>>> HDFS replication does not store the same file in all the nodes in
>>>>>>>>> the cluster. So if I have three nodes and the replication is two then the
>>>>>>>>> same file will be stored physically in two nodes in the cluster. Does that
>>>>>>>>> sound right?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file
>>>>>>>>> is > 128 then it will be broken up into blocks
>>>>>>>>>
>>>>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>>>>
>>>>>>>>> and each block will be replicated. With replication = 2 there will
>>>>>>>>> be two copies of each block, but the file itself can span > 2 hosts.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>>>>> If SPARK is trying to process to the records then I am expecting
>>>>>>>>> that WORKER2 should not be processing file1.csv, and similary WORKER 1
>>>>>>>>> should not be processing file2.csv and WORKER3 should not be processing
>>>>>>>>> file3.csv. Because in case WORKER2 was trying to process file1.csv then it
>>>>>>>>> will actually causing network transmission of the file unnecessarily.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>>>>> traffic, but it schedules for execution time over waiting for workers free
>>>>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is only
>>>>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>>>>
>>>>>>>>> There's details on whether/how work across blocks takes place
>>>>>>>>> which I'm avoiding. For now know those formats which are "splittable" will
>>>>>>>>> have work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>>>>>> compress with snappy, it will be split. This gives you maximum performance
>>>>>>>>> as >1 thread can work on different blocks. That is, if file1 is split into
>>>>>>>>> three blocks, three worker threads can process it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>>>>> CLARIFY THIS):
>>>>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>>>>>> just ask the workers to process the files which are avialable in the worker
>>>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>>>>>> file2.csv will not be processed at all.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> locality is best-effort, not guaranteed.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE
>>>>>>>>> SHOULD BE EXECUTED (Its been pointed out that I am learning SPARK, and even
>>>>>>>>> I did not take more than 13 mins to set up the cluster and run the code).
>>>>>>>>>
>>>>>>>>> Once you execute the code then you will find that:
>>>>>>>>> 1.  if the path starts with file:/// while reading back then
>>>>>>>>> there is no error reported, but the number of records reported back are
>>>>>>>>> only those records in the worker which also has the server.
>>>>>>>>> 2. also you will notice that once you cache the file before
>>>>>>>>> writing the partitions are ditributed nicely across the workers, and while
>>>>>>>>> writing back, the dataframe partitions does write properly to the worker
>>>>>>>>> node in the Master, but the workers in the other system have the files
>>>>>>>>> written in _temporary folder which does not get copied back to the main
>>>>>>>>> folder. Inspite of this the job is not reported as failed in SPARK.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This gets into the "commit protocol". You don't want to know all
>>>>>>>>> the dirty details (*) but essentially its this
>>>>>>>>>
>>>>>>>>> 1. Every worker writes its output to a directory under the
>>>>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>>>>> output from the individual workers from the temporary directories into
>>>>>>>>> $dest, then deleting $dest/_temporary
>>>>>>>>> 3. For which it needs to be able to list all the output in
>>>>>>>>> $dest/_temporary
>>>>>>>>>
>>>>>>>>> In your case, only the output on the same node of the driver is
>>>>>>>>> being committed, because only those files can be listed and moved. The
>>>>>>>>> output on the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>>>>> something is going wrong (with me):
>>>>>>>>> 1. SPARK transfers files from different systems to process,
>>>>>>>>> instead of processing them locally (I do not have code to prove this, and
>>>>>>>>> therefore its just an assumption)
>>>>>>>>> 2. SPARK cannot determine when the writes are failing in
>>>>>>>>> standalone clusters workers and reports success (code is there for this)
>>>>>>>>> 3. SPARK reports back number of records in the worker running in
>>>>>>>>> the master node when count() is given without reporting an error while
>>>>>>>>> using file:/// and reports an error when I mention the path
>>>>>>>>> without file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> s everyone's been saying, file:// requires a shared filestore,
>>>>>>>>> with uniform paths everywhere. That's needed to list the files to process,
>>>>>>>>> read the files in the workers and commit the final output. NFS
>>>>>>>>> cross-mounting is the simplest way to do this, especially as for three
>>>>>>>>> nodes HDFS is overkill: more services to keep running, no real fault
>>>>>>>>> tolerance. Export a directory tree from one of the servers, give the rest
>>>>>>>>> access to it, don't worry about bandwidth use as the shared disk itself
>>>>>>>>> will become the bottleneck
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I very sincerely hope with your genuine help the bar of language
>>>>>>>>> and social skills will be lowered for me. And everyone will find a way to
>>>>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>>>>>> just focus on the facts related to machines, data, error and (the language
>>>>>>>>> that I somehow understand better) code.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>>>>> meet the required social and language skills.
>>>>>>>>>
>>>>>>>>> Thanks a ton once again for your kindness, patience and
>>>>>>>>> understanding.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Gourav Sengupta
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> * for the curious, the details of the v1 and v2 commit protocols
>>>>>>>>> are
>>>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>>>>
>>>>>>>>> Like I said: you don't want to know the details, and you really
>>>>>>>>> don't want to step through Hadoop's FileOutputCommitter to see what's going
>>>>>>>>> on. The Spark side is much easier to follow.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: SPARK Issue in Standalone cluster

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Marco,

thanks a ton, I will surely use those alternatives.


Regards,
Gourav Sengupta

On Sun, Aug 6, 2017 at 3:45 PM, Marco Mistroni <mm...@gmail.com> wrote:

> Sengupta
>  further to this, if you try the following notebook in databricks cloud,
> it will read a .csv file , write to a parquet file and read it again (just
> to count the number of rows stored)
> Please note that the path to the csv file might differ for you.....
> So, what you will need todo is
> 1 - create an account to community.cloud.databricks.com
> 2 - upload the .csv file onto the Data of your databricks private cluster
> 3  - run the script. that will store the data on the distrubuted
> filesystem of the databricks cloudn (dbfs)
>
> It's worth investing in this free databricks cloud as it can create a
> cluster for you with minimal effort, and it's  a very easy way to test your
> spark scripts on a real cluster
>
> hope this helps
> kr
>
> ##################################
> from pyspark.sql import SQLContext
>
> from random import randint
> from time import sleep
> from pyspark.sql.session import SparkSession
> import logging
> logger = logging.getLogger(__name__)
> logger.setLevel(logging.INFO)
> ch = logging.StreamHandler()
> logger.addHandler(ch)
>
>
> import sys
>
> def read_parquet_file(parquetFileName):
>   logger.info('Reading now the parquet files we just created...:%s',
> parquetFileName)
>   parquet_data = sqlContext.read.parquet(parquetFileName)
>   logger.info('Parquet file has %s', parquet_data.count())
>
> def dataprocessing(filePath, count, sqlContext):
>     logger.info( 'Iter count is:%s' , count)
>     if count == 0:
>         print 'exiting'
>     else:
>         df_traffic_tmp = sqlContext.read.format("csv").
> option("header",'true').load(filePath)
>         logger.info( '#############################DataSet has:%s' ,
> df_traffic_tmp.count())
>         logger.info('WRting to a parquet file')
>         parquetFileName = "dbfs:/myParquetDf2.parquet"
>         df_traffic_tmp.write.parquet(parquetFileName)
>         sleepInterval = randint(10,100)
>         logger.info( '#############################Sleeping for %s' ,
> sleepInterval)
>         sleep(sleepInterval)
>         read_parquet_file(parquetFileName)
>         dataprocessing(filePath, count-1, sqlContext)
>
> filename = '/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This
> path might differ for you
> iterations = 1
> logger.info('----------------------')
> logger.info('Filename:%s', filename)
> logger.info('Iterations:%s', iterations )
> logger.info('----------------------')
>
> logger.info ('Initializing sqlContext')
> logger.info( '........Starting spark..........Loading from%s for %s
> iterations' , filename, iterations)
> logger.info(  'Starting up....')
> sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
> logger.info ('Initializing sqlContext')
> sqlContext = SQLContext(sc)
> dataprocessing(filename, iterations, sqlContext)
> logger.info('Out of here..')
> ######################################
>
>
> On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mm...@gmail.com>
> wrote:
>
>> Uh believe me there are lots of ppl on this list who will send u code
>> snippets if u ask... 😀
>>
>> Yes that is what Steve pointed out, suggesting also that for that simple
>> exercise you should perform all operations on a spark standalone instead
>> (or alt. Use an nfs on the cluster)
>> I'd agree with his suggestion....
>> I suggest u another alternative:
>> https://community.cloud.databricks.com/
>>
>> That's a ready made cluster and you can run your spark app as well store
>> data on the cluster (well I haven't tried myself but I assume it's
>> possible).   Try that out... I will try ur script there as I have an
>> account there (though I guess I'll get there before me.....)
>>
>> Try that out and let me know if u get stuck....
>> Kr
>>
>> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <go...@gmail.com>
>> wrote:
>>
>>> Hi Marco,
>>>
>>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
>>> someone actually executing code and providing response. It feel wonderful
>>> that at least someone considered to respond back by executing code and just
>>> did not filter out each and every technical details to brood only on my
>>> superb social skills, while claiming the reason for ignoring technical
>>> details is that it elementary. I think that Steve also is the first person
>>> who could answer the WHY of an elementary question instead of saying that
>>> is how it is and pointed out to the correct documentation.
>>>
>>> That code works fantastically. But the problem which I have tried to
>>> find out is while writing out the data and not reading it.
>>>
>>>
>>> So if you see try to read the data from the same folder which has the
>>> same file across all the nodes then it will work fine. In fact that is what
>>> should work.
>>>
>>> What does not work is that if you try to write back the file and then
>>> read it once again from the location you have written that is when the
>>> issue starts happening.
>>>
>>> Therefore if in my code you were to save the pandas dataframe as a CSV
>>> file and then read it then you will find the following observations:
>>>
>>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>> columns=list('ABCD'))
>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>> header=True, sep=",", index=0)
>>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa
>>> rkdata/testdir/")
>>> testdf.cache()
>>> testdf.count()
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>>
>>>
>>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN
>>> WHICH THE DATA DOES NOT EXISTS
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>>> columns=list('ABCD'))
>>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>>> header=True, sep=",", index=0)
>>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/sp
>>> ark/sparkdata/testdir/")
>>> testdf.cache()
>>> testdf.count()
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ------------------------------------------------------------
>>> ---------------------------
>>>
>>>
>>> if you execute my code then also you will surprisingly see that the
>>> writes in the nodes which is not the master node does not complete moving
>>> the files from the _temporary folder to the main one.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>>
>>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mm...@gmail.com>
>>> wrote:
>>>
>>>> Hello
>>>>  please have a look at this. it'sa simple script that just read a
>>>> dataframe for n time, sleeping at random interval. i used it to test memory
>>>> issues that another user was experiencing on a spark cluster
>>>>
>>>> you should run it like this e.g
>>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv>
>>>> <num of iterations>
>>>>
>>>> i ran it on the cluster like this
>>>>
>>>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>>>> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
>>>> file:///root/pyscripts/tree_addhealth.csv
>>>>
>>>> hth, ping me back if you have issues
>>>> i do agree with Steve's comments.... if you want to test your  spark
>>>> script s just for playing, do it on  a standaone server on your localhost.
>>>> Moving to a c luster is just a matter of deploying your script and mke sure
>>>> you have a common place where to read and store the data..... SysAdmin
>>>> should give you this when they setup the cluster...
>>>>
>>>> kr
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>>>> gourav.sengupta@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> I am sincerely obliged for your kind time and response. Can you please
>>>>> try the solution that you have so kindly suggested?
>>>>>
>>>>> It will be a lot of help if you could kindly execute the code that I
>>>>> have given. I dont think that anyone has yet.
>>>>>
>>>>> There are lots of fine responses to my question here, but if you read
>>>>> the last response from Simon, it comes the closest to being satisfactory. I
>>>>> am sure even he did not execute the code, but at least he came quite close
>>>>> to understanding what the problem is.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>>
>>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mm...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello
>>>>>>  my 2 cents here, hope it helps
>>>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>>>> it's an unnecessary dependency that you dont need for just running a python
>>>>>> script
>>>>>> Instead do the following:
>>>>>> - got to the root of our master / slave node. create a directory
>>>>>> /root/pyscripts
>>>>>> - place your csv file there as well as the python script
>>>>>> - run the script to replicate the whole directory  across the cluster
>>>>>> (i believe it's called copy-script.sh)
>>>>>> - then run your spark-submit , it will be something lke
>>>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>>>> - in your python script, as part of your processing, write the
>>>>>> parquet file in directory /root/pyscripts
>>>>>>
>>>>>> If you have an AWS account and you are versatile with that - you need
>>>>>> to setup bucket permissions etc - , you can just
>>>>>> - store your file in one of your S3 bucket
>>>>>> - create an EMR cluster
>>>>>> - connect to master or slave
>>>>>> - run your  scritp that reads from the s3 bucket and write to the
>>>>>> same s3 bucket
>>>>>>
>>>>>>
>>>>>> Feel free to mail me privately, i have a working script i have used
>>>>>> to test some code on spark standalone cluster
>>>>>> hth
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Steve,
>>>>>>>
>>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>>>
>>>>>>> I am now going through the documentation (
>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much
>>>>>>> much more sense now.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gourav Sengupta
>>>>>>>
>>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>>>> stevel@hortonworks.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Steve,
>>>>>>>>
>>>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>>>>> does sound impolite in my emails, in advance.
>>>>>>>>
>>>>>>>> Let me first start by thanking you.
>>>>>>>>
>>>>>>>> I know it looks like I formed all my opinion based on that
>>>>>>>> document, but that is not the case at all. If you or anyone tries to
>>>>>>>> execute the code that I have given then they will see what I mean. Code
>>>>>>>> speaks louder and better than words for me.
>>>>>>>>
>>>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>>>> someone will be able to correct  a set of understanding that a moron like
>>>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>>>
>>>>>>>>
>>>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in
>>>>>>>> HDFS with replication 2 and there is a HADOOP cluster of three nodes. All
>>>>>>>> these nodes have SPARK workers (executors) running in them.  Both are
>>>>>>>> stored in the following way:
>>>>>>>> -----------------------------------------------------
>>>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>>>> | (master)     |                     |                    |
>>>>>>>> -----------------------------------------------------
>>>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>>>> -----------------------------------------------------
>>>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>>>> -----------------------------------------------------
>>>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>>>> -----------------------------------------------------
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>>>> HDFS replication does not store the same file in all the nodes in
>>>>>>>> the cluster. So if I have three nodes and the replication is two then the
>>>>>>>> same file will be stored physically in two nodes in the cluster. Does that
>>>>>>>> sound right?
>>>>>>>>
>>>>>>>>
>>>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file
>>>>>>>> is > 128 then it will be broken up into blocks
>>>>>>>>
>>>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>>>
>>>>>>>> and each block will be replicated. With replication = 2 there will
>>>>>>>> be two copies of each block, but the file itself can span > 2 hosts.
>>>>>>>>
>>>>>>>>
>>>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>>>> If SPARK is trying to process to the records then I am expecting
>>>>>>>> that WORKER2 should not be processing file1.csv, and similary WORKER 1
>>>>>>>> should not be processing file2.csv and WORKER3 should not be processing
>>>>>>>> file3.csv. Because in case WORKER2 was trying to process file1.csv then it
>>>>>>>> will actually causing network transmission of the file unnecessarily.
>>>>>>>>
>>>>>>>>
>>>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>>>> traffic, but it schedules for execution time over waiting for workers free
>>>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is only
>>>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>>>
>>>>>>>> There's details on whether/how work across blocks takes place which
>>>>>>>> I'm avoiding. For now know those formats which are "splittable" will have
>>>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>>>>> compress with snappy, it will be split. This gives you maximum performance
>>>>>>>> as >1 thread can work on different blocks. That is, if file1 is split into
>>>>>>>> three blocks, three worker threads can process it.
>>>>>>>>
>>>>>>>>
>>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>>>> CLARIFY THIS):
>>>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>>>>> just ask the workers to process the files which are avialable in the worker
>>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>>>>> file2.csv will not be processed at all.
>>>>>>>>
>>>>>>>>
>>>>>>>> locality is best-effort, not guaranteed.
>>>>>>>>
>>>>>>>>
>>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD
>>>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I did
>>>>>>>> not take more than 13 mins to set up the cluster and run the code).
>>>>>>>>
>>>>>>>> Once you execute the code then you will find that:
>>>>>>>> 1.  if the path starts with file:/// while reading back then there
>>>>>>>> is no error reported, but the number of records reported back are only
>>>>>>>> those records in the worker which also has the server.
>>>>>>>> 2. also you will notice that once you cache the file before writing
>>>>>>>> the partitions are ditributed nicely across the workers, and while writing
>>>>>>>> back, the dataframe partitions does write properly to the worker node in
>>>>>>>> the Master, but the workers in the other system have the files written in
>>>>>>>> _temporary folder which does not get copied back to the main folder.
>>>>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>>>>
>>>>>>>>
>>>>>>>> This gets into the "commit protocol". You don't want to know all
>>>>>>>> the dirty details (*) but essentially its this
>>>>>>>>
>>>>>>>> 1. Every worker writes its output to a directory under the
>>>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>>>> output from the individual workers from the temporary directories into
>>>>>>>> $dest, then deleting $dest/_temporary
>>>>>>>> 3. For which it needs to be able to list all the output in
>>>>>>>> $dest/_temporary
>>>>>>>>
>>>>>>>> In your case, only the output on the same node of the driver is
>>>>>>>> being committed, because only those files can be listed and moved. The
>>>>>>>> output on the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>>>> something is going wrong (with me):
>>>>>>>> 1. SPARK transfers files from different systems to process, instead
>>>>>>>> of processing them locally (I do not have code to prove this, and therefore
>>>>>>>> its just an assumption)
>>>>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>>>>> clusters workers and reports success (code is there for this)
>>>>>>>> 3. SPARK reports back number of records in the worker running in
>>>>>>>> the master node when count() is given without reporting an error while
>>>>>>>> using file:/// and reports an error when I mention the path
>>>>>>>> without file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>>>>> uniform paths everywhere. That's needed to list the files to process, read
>>>>>>>> the files in the workers and commit the final output. NFS cross-mounting is
>>>>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>>>>> overkill: more services to keep running, no real fault tolerance. Export a
>>>>>>>> directory tree from one of the servers, give the rest access to it, don't
>>>>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>>>>> bottleneck
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I very sincerely hope with your genuine help the bar of language
>>>>>>>> and social skills will be lowered for me. And everyone will find a way to
>>>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>>>>> just focus on the facts related to machines, data, error and (the language
>>>>>>>> that I somehow understand better) code.
>>>>>>>>
>>>>>>>>
>>>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>>>> meet the required social and language skills.
>>>>>>>>
>>>>>>>> Thanks a ton once again for your kindness, patience and
>>>>>>>> understanding.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Gourav Sengupta
>>>>>>>>
>>>>>>>>
>>>>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>>>
>>>>>>>> Like I said: you don't want to know the details, and you really
>>>>>>>> don't want to step through Hadoop's FileOutputCommitter to see what's going
>>>>>>>> on. The Spark side is much easier to follow.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: SPARK Issue in Standalone cluster

Posted by Marco Mistroni <mm...@gmail.com>.
Sengupta
 further to this, if you try the following notebook in databricks cloud, it
will read a .csv file , write to a parquet file and read it again (just to
count the number of rows stored)
Please note that the path to the csv file might differ for you.....
So, what you will need todo is
1 - create an account to community.cloud.databricks.com
2 - upload the .csv file onto the Data of your databricks private cluster
3  - run the script. that will store the data on the distrubuted filesystem
of the databricks cloudn (dbfs)

It's worth investing in this free databricks cloud as it can create a
cluster for you with minimal effort, and it's  a very easy way to test your
spark scripts on a real cluster

hope this helps
kr

##################################
from pyspark.sql import SQLContext

from random import randint
from time import sleep
from pyspark.sql.session import SparkSession
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)


import sys

def read_parquet_file(parquetFileName):
  logger.info('Reading now the parquet files we just created...:%s',
parquetFileName)
  parquet_data = sqlContext.read.parquet(parquetFileName)
  logger.info('Parquet file has %s', parquet_data.count())

def dataprocessing(filePath, count, sqlContext):
    logger.info( 'Iter count is:%s' , count)
    if count == 0:
        print 'exiting'
    else:
        df_traffic_tmp =
sqlContext.read.format("csv").option("header",'true').load(filePath)
        logger.info( '#############################DataSet has:%s' ,
df_traffic_tmp.count())
        logger.info('WRting to a parquet file')
        parquetFileName = "dbfs:/myParquetDf2.parquet"
        df_traffic_tmp.write.parquet(parquetFileName)
        sleepInterval = randint(10,100)
        logger.info( '#############################Sleeping for %s' ,
sleepInterval)
        sleep(sleepInterval)
        read_parquet_file(parquetFileName)
        dataprocessing(filePath, count-1, sqlContext)

filename =
'/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This path
might differ for you
iterations = 1
logger.info('----------------------')
logger.info('Filename:%s', filename)
logger.info('Iterations:%s', iterations )
logger.info('----------------------')

logger.info ('Initializing sqlContext')
logger.info( '........Starting spark..........Loading from%s for %s
iterations' , filename, iterations)
logger.info(  'Starting up....')
sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
logger.info ('Initializing sqlContext')
sqlContext = SQLContext(sc)
dataprocessing(filename, iterations, sqlContext)
logger.info('Out of here..')
######################################


On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mm...@gmail.com> wrote:

> Uh believe me there are lots of ppl on this list who will send u code
> snippets if u ask... 😀
>
> Yes that is what Steve pointed out, suggesting also that for that simple
> exercise you should perform all operations on a spark standalone instead
> (or alt. Use an nfs on the cluster)
> I'd agree with his suggestion....
> I suggest u another alternative:
> https://community.cloud.databricks.com/
>
> That's a ready made cluster and you can run your spark app as well store
> data on the cluster (well I haven't tried myself but I assume it's
> possible).   Try that out... I will try ur script there as I have an
> account there (though I guess I'll get there before me.....)
>
> Try that out and let me know if u get stuck....
> Kr
>
> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <go...@gmail.com>
> wrote:
>
>> Hi Marco,
>>
>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
>> someone actually executing code and providing response. It feel wonderful
>> that at least someone considered to respond back by executing code and just
>> did not filter out each and every technical details to brood only on my
>> superb social skills, while claiming the reason for ignoring technical
>> details is that it elementary. I think that Steve also is the first person
>> who could answer the WHY of an elementary question instead of saying that
>> is how it is and pointed out to the correct documentation.
>>
>> That code works fantastically. But the problem which I have tried to find
>> out is while writing out the data and not reading it.
>>
>>
>> So if you see try to read the data from the same folder which has the
>> same file across all the nodes then it will work fine. In fact that is what
>> should work.
>>
>> What does not work is that if you try to write back the file and then
>> read it once again from the location you have written that is when the
>> issue starts happening.
>>
>> Therefore if in my code you were to save the pandas dataframe as a CSV
>> file and then read it then you will find the following observations:
>>
>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>> columns=list('ABCD'))
>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>> header=True, sep=",", index=0)
>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa
>> rkdata/testdir/")
>> testdf.cache()
>> testdf.count()
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>>
>>
>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
>> THE DATA DOES NOT EXISTS
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>> columns=list('ABCD'))
>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>> header=True, sep=",", index=0)
>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/
>> spark/sparkdata/testdir/")
>> testdf.cache()
>> testdf.count()
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>>
>>
>> if you execute my code then also you will surprisingly see that the
>> writes in the nodes which is not the master node does not complete moving
>> the files from the _temporary folder to the main one.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mm...@gmail.com>
>> wrote:
>>
>>> Hello
>>>  please have a look at this. it'sa simple script that just read a
>>> dataframe for n time, sleeping at random interval. i used it to test memory
>>> issues that another user was experiencing on a spark cluster
>>>
>>> you should run it like this e.g
>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv>
>>> <num of iterations>
>>>
>>> i ran it on the cluster like this
>>>
>>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>>> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
>>> file:///root/pyscripts/tree_addhealth.csv
>>>
>>> hth, ping me back if you have issues
>>> i do agree with Steve's comments.... if you want to test your  spark
>>> script s just for playing, do it on  a standaone server on your localhost.
>>> Moving to a c luster is just a matter of deploying your script and mke sure
>>> you have a common place where to read and store the data..... SysAdmin
>>> should give you this when they setup the cluster...
>>>
>>> kr
>>>
>>>
>>>
>>>
>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>>> gourav.sengupta@gmail.com> wrote:
>>>
>>>> Hi Marco,
>>>>
>>>> I am sincerely obliged for your kind time and response. Can you please
>>>> try the solution that you have so kindly suggested?
>>>>
>>>> It will be a lot of help if you could kindly execute the code that I
>>>> have given. I dont think that anyone has yet.
>>>>
>>>> There are lots of fine responses to my question here, but if you read
>>>> the last response from Simon, it comes the closest to being satisfactory. I
>>>> am sure even he did not execute the code, but at least he came quite close
>>>> to understanding what the problem is.
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mm...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello
>>>>>  my 2 cents here, hope it helps
>>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>>> it's an unnecessary dependency that you dont need for just running a python
>>>>> script
>>>>> Instead do the following:
>>>>> - got to the root of our master / slave node. create a directory
>>>>> /root/pyscripts
>>>>> - place your csv file there as well as the python script
>>>>> - run the script to replicate the whole directory  across the cluster
>>>>> (i believe it's called copy-script.sh)
>>>>> - then run your spark-submit , it will be something lke
>>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>>> - in your python script, as part of your processing, write the parquet
>>>>> file in directory /root/pyscripts
>>>>>
>>>>> If you have an AWS account and you are versatile with that - you need
>>>>> to setup bucket permissions etc - , you can just
>>>>> - store your file in one of your S3 bucket
>>>>> - create an EMR cluster
>>>>> - connect to master or slave
>>>>> - run your  scritp that reads from the s3 bucket and write to the same
>>>>> s3 bucket
>>>>>
>>>>>
>>>>> Feel free to mail me privately, i have a working script i have used to
>>>>> test some code on spark standalone cluster
>>>>> hth
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>
>>>>>> Hi Steve,
>>>>>>
>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>>
>>>>>> I am now going through the documentation (
>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much
>>>>>> much more sense now.
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>>> stevel@hortonworks.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Steve,
>>>>>>>
>>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>>>> does sound impolite in my emails, in advance.
>>>>>>>
>>>>>>> Let me first start by thanking you.
>>>>>>>
>>>>>>> I know it looks like I formed all my opinion based on that document,
>>>>>>> but that is not the case at all. If you or anyone tries to execute the code
>>>>>>> that I have given then they will see what I mean. Code speaks louder and
>>>>>>> better than words for me.
>>>>>>>
>>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>>> someone will be able to correct  a set of understanding that a moron like
>>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>>
>>>>>>>
>>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>>>>>> with replication 2 and there is a HADOOP cluster of three nodes. All these
>>>>>>> nodes have SPARK workers (executors) running in them.  Both are stored in
>>>>>>> the following way:
>>>>>>> -----------------------------------------------------
>>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>>> | (master)     |                     |                    |
>>>>>>> -----------------------------------------------------
>>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>>> -----------------------------------------------------
>>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>>> -----------------------------------------------------
>>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>>> -----------------------------------------------------
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>>> HDFS replication does not store the same file in all the nodes in
>>>>>>> the cluster. So if I have three nodes and the replication is two then the
>>>>>>> same file will be stored physically in two nodes in the cluster. Does that
>>>>>>> sound right?
>>>>>>>
>>>>>>>
>>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file
>>>>>>> is > 128 then it will be broken up into blocks
>>>>>>>
>>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>>
>>>>>>> and each block will be replicated. With replication = 2 there will
>>>>>>> be two copies of each block, but the file itself can span > 2 hosts.
>>>>>>>
>>>>>>>
>>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>>> If SPARK is trying to process to the records then I am expecting
>>>>>>> that WORKER2 should not be processing file1.csv, and similary WORKER 1
>>>>>>> should not be processing file2.csv and WORKER3 should not be processing
>>>>>>> file3.csv. Because in case WORKER2 was trying to process file1.csv then it
>>>>>>> will actually causing network transmission of the file unnecessarily.
>>>>>>>
>>>>>>>
>>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>>> traffic, but it schedules for execution time over waiting for workers free
>>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is only
>>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>>
>>>>>>> There's details on whether/how work across blocks takes place which
>>>>>>> I'm avoiding. For now know those formats which are "splittable" will have
>>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>>>> compress with snappy, it will be split. This gives you maximum performance
>>>>>>> as >1 thread can work on different blocks. That is, if file1 is split into
>>>>>>> three blocks, three worker threads can process it.
>>>>>>>
>>>>>>>
>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>>> CLARIFY THIS):
>>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>>>> just ask the workers to process the files which are avialable in the worker
>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>>>> file2.csv will not be processed at all.
>>>>>>>
>>>>>>>
>>>>>>> locality is best-effort, not guaranteed.
>>>>>>>
>>>>>>>
>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD
>>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I did
>>>>>>> not take more than 13 mins to set up the cluster and run the code).
>>>>>>>
>>>>>>> Once you execute the code then you will find that:
>>>>>>> 1.  if the path starts with file:/// while reading back then there
>>>>>>> is no error reported, but the number of records reported back are only
>>>>>>> those records in the worker which also has the server.
>>>>>>> 2. also you will notice that once you cache the file before writing
>>>>>>> the partitions are ditributed nicely across the workers, and while writing
>>>>>>> back, the dataframe partitions does write properly to the worker node in
>>>>>>> the Master, but the workers in the other system have the files written in
>>>>>>> _temporary folder which does not get copied back to the main folder.
>>>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>>>
>>>>>>>
>>>>>>> This gets into the "commit protocol". You don't want to know all the
>>>>>>> dirty details (*) but essentially its this
>>>>>>>
>>>>>>> 1. Every worker writes its output to a directory under the
>>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>>> output from the individual workers from the temporary directories into
>>>>>>> $dest, then deleting $dest/_temporary
>>>>>>> 3. For which it needs to be able to list all the output in
>>>>>>> $dest/_temporary
>>>>>>>
>>>>>>> In your case, only the output on the same node of the driver is
>>>>>>> being committed, because only those files can be listed and moved. The
>>>>>>> output on the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>>> something is going wrong (with me):
>>>>>>> 1. SPARK transfers files from different systems to process, instead
>>>>>>> of processing them locally (I do not have code to prove this, and therefore
>>>>>>> its just an assumption)
>>>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>>>> clusters workers and reports success (code is there for this)
>>>>>>> 3. SPARK reports back number of records in the worker running in the
>>>>>>> master node when count() is given without reporting an error while using
>>>>>>> file:/// and reports an error when I mention the path without
>>>>>>> file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>>>> uniform paths everywhere. That's needed to list the files to process, read
>>>>>>> the files in the workers and commit the final output. NFS cross-mounting is
>>>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>>>> overkill: more services to keep running, no real fault tolerance. Export a
>>>>>>> directory tree from one of the servers, give the rest access to it, don't
>>>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>>>> bottleneck
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I very sincerely hope with your genuine help the bar of language and
>>>>>>> social skills will be lowered for me. And everyone will find a way to
>>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>>>> just focus on the facts related to machines, data, error and (the language
>>>>>>> that I somehow understand better) code.
>>>>>>>
>>>>>>>
>>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>>> meet the required social and language skills.
>>>>>>>
>>>>>>> Thanks a ton once again for your kindness, patience and
>>>>>>> understanding.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gourav Sengupta
>>>>>>>
>>>>>>>
>>>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>>
>>>>>>> Like I said: you don't want to know the details, and you really
>>>>>>> don't want to step through Hadoop's FileOutputCommitter to see what's going
>>>>>>> on. The Spark side is much easier to follow.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: SPARK Issue in Standalone cluster

Posted by Marco Mistroni <mm...@gmail.com>.
Uh believe me there are lots of ppl on this list who will send u code
snippets if u ask... 😀

Yes that is what Steve pointed out, suggesting also that for that simple
exercise you should perform all operations on a spark standalone instead
(or alt. Use an nfs on the cluster)
I'd agree with his suggestion....
I suggest u another alternative:
https://community.cloud.databricks.com/

That's a ready made cluster and you can run your spark app as well store
data on the cluster (well I haven't tried myself but I assume it's
possible).   Try that out... I will try ur script there as I have an
account there (though I guess I'll get there before me.....)

Try that out and let me know if u get stuck....
Kr

On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <go...@gmail.com> wrote:

> Hi Marco,
>
> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
> someone actually executing code and providing response. It feel wonderful
> that at least someone considered to respond back by executing code and just
> did not filter out each and every technical details to brood only on my
> superb social skills, while claiming the reason for ignoring technical
> details is that it elementary. I think that Steve also is the first person
> who could answer the WHY of an elementary question instead of saying that
> is how it is and pointed out to the correct documentation.
>
> That code works fantastically. But the problem which I have tried to find
> out is while writing out the data and not reading it.
>
>
> So if you see try to read the data from the same folder which has the same
> file across all the nodes then it will work fine. In fact that is what
> should work.
>
> What does not work is that if you try to write back the file and then read
> it once again from the location you have written that is when the issue
> starts happening.
>
> Therefore if in my code you were to save the pandas dataframe as a CSV
> file and then read it then you will find the following observations:
>
> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/
> sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
>
>
> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
> THE DATA DOES NOT EXISTS
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("file:///Users/gouravsengupta/
> Development/spark/sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> ------------------------------------------------------------
> ------------------------------------------------------------
> ------------------------------------------------------------
> ---------------------------
>
>
> if you execute my code then also you will surprisingly see that the writes
> in the nodes which is not the master node does not complete moving the
> files from the _temporary folder to the main one.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mm...@gmail.com>
> wrote:
>
>> Hello
>>  please have a look at this. it'sa simple script that just read a
>> dataframe for n time, sleeping at random interval. i used it to test memory
>> issues that another user was experiencing on a spark cluster
>>
>> you should run it like this e.g
>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> <num
>> of iterations>
>>
>> i ran it on the cluster like this
>>
>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
>> file:///root/pyscripts/tree_addhealth.csv
>>
>> hth, ping me back if you have issues
>> i do agree with Steve's comments.... if you want to test your  spark
>> script s just for playing, do it on  a standaone server on your localhost.
>> Moving to a c luster is just a matter of deploying your script and mke sure
>> you have a common place where to read and store the data..... SysAdmin
>> should give you this when they setup the cluster...
>>
>> kr
>>
>>
>>
>>
>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> I am sincerely obliged for your kind time and response. Can you please
>>> try the solution that you have so kindly suggested?
>>>
>>> It will be a lot of help if you could kindly execute the code that I
>>> have given. I dont think that anyone has yet.
>>>
>>> There are lots of fine responses to my question here, but if you read
>>> the last response from Simon, it comes the closest to being satisfactory. I
>>> am sure even he did not execute the code, but at least he came quite close
>>> to understanding what the problem is.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mm...@gmail.com>
>>> wrote:
>>>
>>>> Hello
>>>>  my 2 cents here, hope it helps
>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>> it's an unnecessary dependency that you dont need for just running a python
>>>> script
>>>> Instead do the following:
>>>> - got to the root of our master / slave node. create a directory
>>>> /root/pyscripts
>>>> - place your csv file there as well as the python script
>>>> - run the script to replicate the whole directory  across the cluster
>>>> (i believe it's called copy-script.sh)
>>>> - then run your spark-submit , it will be something lke
>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>> - in your python script, as part of your processing, write the parquet
>>>> file in directory /root/pyscripts
>>>>
>>>> If you have an AWS account and you are versatile with that - you need
>>>> to setup bucket permissions etc - , you can just
>>>> - store your file in one of your S3 bucket
>>>> - create an EMR cluster
>>>> - connect to master or slave
>>>> - run your  scritp that reads from the s3 bucket and write to the same
>>>> s3 bucket
>>>>
>>>>
>>>> Feel free to mail me privately, i have a working script i have used to
>>>> test some code on spark standalone cluster
>>>> hth
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>> gourav.sengupta@gmail.com> wrote:
>>>>
>>>>> Hi Steve,
>>>>>
>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>
>>>>> I am now going through the documentation (
>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much much
>>>>> more sense now.
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>> stevel@hortonworks.com> wrote:
>>>>>
>>>>>>
>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Steve,
>>>>>>
>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>>> does sound impolite in my emails, in advance.
>>>>>>
>>>>>> Let me first start by thanking you.
>>>>>>
>>>>>> I know it looks like I formed all my opinion based on that document,
>>>>>> but that is not the case at all. If you or anyone tries to execute the code
>>>>>> that I have given then they will see what I mean. Code speaks louder and
>>>>>> better than words for me.
>>>>>>
>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>> someone will be able to correct  a set of understanding that a moron like
>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>
>>>>>>
>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>>>>> with replication 2 and there is a HADOOP cluster of three nodes. All these
>>>>>> nodes have SPARK workers (executors) running in them.  Both are stored in
>>>>>> the following way:
>>>>>> -----------------------------------------------------
>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>> | (master)     |                     |                    |
>>>>>> -----------------------------------------------------
>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>> -----------------------------------------------------
>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>> -----------------------------------------------------
>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>> -----------------------------------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>> HDFS replication does not store the same file in all the nodes in the
>>>>>> cluster. So if I have three nodes and the replication is two then the same
>>>>>> file will be stored physically in two nodes in the cluster. Does that sound
>>>>>> right?
>>>>>>
>>>>>>
>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is
>>>>>> > 128 then it will be broken up into blocks
>>>>>>
>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>
>>>>>> and each block will be replicated. With replication = 2 there will be
>>>>>> two copies of each block, but the file itself can span > 2 hosts.
>>>>>>
>>>>>>
>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>> If SPARK is trying to process to the records then I am expecting that
>>>>>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>>>>>> not be processing file2.csv and WORKER3 should not be processing file3.csv.
>>>>>> Because in case WORKER2 was trying to process file1.csv then it will
>>>>>> actually causing network transmission of the file unnecessarily.
>>>>>>
>>>>>>
>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>> traffic, but it schedules for execution time over waiting for workers free
>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is only
>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>
>>>>>> There's details on whether/how work across blocks takes place which
>>>>>> I'm avoiding. For now know those formats which are "splittable" will have
>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>>> compress with snappy, it will be split. This gives you maximum performance
>>>>>> as >1 thread can work on different blocks. That is, if file1 is split into
>>>>>> three blocks, three worker threads can process it.
>>>>>>
>>>>>>
>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>> CLARIFY THIS):
>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>>> just ask the workers to process the files which are avialable in the worker
>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>>> file2.csv will not be processed at all.
>>>>>>
>>>>>>
>>>>>> locality is best-effort, not guaranteed.
>>>>>>
>>>>>>
>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD
>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I did
>>>>>> not take more than 13 mins to set up the cluster and run the code).
>>>>>>
>>>>>> Once you execute the code then you will find that:
>>>>>> 1.  if the path starts with file:/// while reading back then there
>>>>>> is no error reported, but the number of records reported back are only
>>>>>> those records in the worker which also has the server.
>>>>>> 2. also you will notice that once you cache the file before writing
>>>>>> the partitions are ditributed nicely across the workers, and while writing
>>>>>> back, the dataframe partitions does write properly to the worker node in
>>>>>> the Master, but the workers in the other system have the files written in
>>>>>> _temporary folder which does not get copied back to the main folder.
>>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>>
>>>>>>
>>>>>> This gets into the "commit protocol". You don't want to know all the
>>>>>> dirty details (*) but essentially its this
>>>>>>
>>>>>> 1. Every worker writes its output to a directory under the
>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>> output from the individual workers from the temporary directories into
>>>>>> $dest, then deleting $dest/_temporary
>>>>>> 3. For which it needs to be able to list all the output in
>>>>>> $dest/_temporary
>>>>>>
>>>>>> In your case, only the output on the same node of the driver is being
>>>>>> committed, because only those files can be listed and moved. The output on
>>>>>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>> something is going wrong (with me):
>>>>>> 1. SPARK transfers files from different systems to process, instead
>>>>>> of processing them locally (I do not have code to prove this, and therefore
>>>>>> its just an assumption)
>>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>>> clusters workers and reports success (code is there for this)
>>>>>> 3. SPARK reports back number of records in the worker running in the
>>>>>> master node when count() is given without reporting an error while using
>>>>>> file:/// and reports an error when I mention the path without
>>>>>> file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>
>>>>>>
>>>>>>
>>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>>> uniform paths everywhere. That's needed to list the files to process, read
>>>>>> the files in the workers and commit the final output. NFS cross-mounting is
>>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>>> overkill: more services to keep running, no real fault tolerance. Export a
>>>>>> directory tree from one of the servers, give the rest access to it, don't
>>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>>> bottleneck
>>>>>>
>>>>>>
>>>>>>
>>>>>> I very sincerely hope with your genuine help the bar of language and
>>>>>> social skills will be lowered for me. And everyone will find a way to
>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>>> just focus on the facts related to machines, data, error and (the language
>>>>>> that I somehow understand better) code.
>>>>>>
>>>>>>
>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>> meet the required social and language skills.
>>>>>>
>>>>>> Thanks a ton once again for your kindness, patience and understanding.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>>
>>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>
>>>>>> Like I said: you don't want to know the details, and you really don't
>>>>>> want to step through Hadoop's FileOutputCommitter to see what's going on.
>>>>>> The Spark side is much easier to follow.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: SPARK Issue in Standalone cluster

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Marco,

For the first time in several years FOR THE VERY FIRST TIME. I am seeing
someone actually executing code and providing response. It feel wonderful
that at least someone considered to respond back by executing code and just
did not filter out each and every technical details to brood only on my
superb social skills, while claiming the reason for ignoring technical
details is that it elementary. I think that Steve also is the first person
who could answer the WHY of an elementary question instead of saying that
is how it is and pointed out to the correct documentation.

That code works fantastically. But the problem which I have tried to find
out is while writing out the data and not reading it.


So if you see try to read the data from the same folder which has the same
file across all the nodes then it will work fine. In fact that is what
should work.

What does not work is that if you try to write back the file and then read
it once again from the location you have written that is when the issue
starts happening.

Therefore if in my code you were to save the pandas dataframe as a CSV file
and then read it then you will find the following observations:

FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
columns=list('ABCD'))
pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
header=True, sep=",", index=0)
testdf = spark.read.load("/Users/gouravsengupta/Development/
spark/sparkdata/testdir/")
testdf.cache()
testdf.count()
------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------


FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
THE DATA DOES NOT EXISTS
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
columns=list('ABCD'))
pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
header=True, sep=",", index=0)
testdf = spark.read.load("file:///Users/gouravsengupta/Development/
spark/sparkdata/testdir/")
testdf.cache()
testdf.count()
------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------


if you execute my code then also you will surprisingly see that the writes
in the nodes which is not the master node does not complete moving the
files from the _temporary folder to the main one.


Regards,
Gourav Sengupta



On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mm...@gmail.com> wrote:

> Hello
>  please have a look at this. it'sa simple script that just read a
> dataframe for n time, sleeping at random interval. i used it to test memory
> issues that another user was experiencing on a spark cluster
>
> you should run it like this e.g
> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv> <num
> of iterations>
>
> i ran it on the cluster like this
>
> ./spark-submit --master spark://ec2-54-218-113-119.us-
> west-2.compute.amazonaws.com:7077   /root/pyscripts/dataprocessing_Sample-2.py
> file:///root/pyscripts/tree_addhealth.csv
>
> hth, ping me back if you have issues
> i do agree with Steve's comments.... if you want to test your  spark
> script s just for playing, do it on  a standaone server on your localhost.
> Moving to a c luster is just a matter of deploying your script and mke sure
> you have a common place where to read and store the data..... SysAdmin
> should give you this when they setup the cluster...
>
> kr
>
>
>
>
> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <gourav.sengupta@gmail.com
> > wrote:
>
>> Hi Marco,
>>
>> I am sincerely obliged for your kind time and response. Can you please
>> try the solution that you have so kindly suggested?
>>
>> It will be a lot of help if you could kindly execute the code that I have
>> given. I dont think that anyone has yet.
>>
>> There are lots of fine responses to my question here, but if you read the
>> last response from Simon, it comes the closest to being satisfactory. I am
>> sure even he did not execute the code, but at least he came quite close to
>> understanding what the problem is.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mm...@gmail.com>
>> wrote:
>>
>>> Hello
>>>  my 2 cents here, hope it helps
>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>> it's an unnecessary dependency that you dont need for just running a python
>>> script
>>> Instead do the following:
>>> - got to the root of our master / slave node. create a directory
>>> /root/pyscripts
>>> - place your csv file there as well as the python script
>>> - run the script to replicate the whole directory  across the cluster (i
>>> believe it's called copy-script.sh)
>>> - then run your spark-submit , it will be something lke
>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>> - in your python script, as part of your processing, write the parquet
>>> file in directory /root/pyscripts
>>>
>>> If you have an AWS account and you are versatile with that - you need to
>>> setup bucket permissions etc - , you can just
>>> - store your file in one of your S3 bucket
>>> - create an EMR cluster
>>> - connect to master or slave
>>> - run your  scritp that reads from the s3 bucket and write to the same
>>> s3 bucket
>>>
>>>
>>> Feel free to mail me privately, i have a working script i have used to
>>> test some code on spark standalone cluster
>>> hth
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>> gourav.sengupta@gmail.com> wrote:
>>>
>>>> Hi Steve,
>>>>
>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>
>>>> I am now going through the documentation (https://github.com/stevelough
>>>> ran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/
>>>> hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_
>>>> architecture.md) and it makes much much more sense now.
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <stevel@hortonworks.com
>>>> > wrote:
>>>>
>>>>>
>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Steve,
>>>>>
>>>>> I have written a sincere note of apology to everyone in a separate
>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>> does sound impolite in my emails, in advance.
>>>>>
>>>>> Let me first start by thanking you.
>>>>>
>>>>> I know it looks like I formed all my opinion based on that document,
>>>>> but that is not the case at all. If you or anyone tries to execute the code
>>>>> that I have given then they will see what I mean. Code speaks louder and
>>>>> better than words for me.
>>>>>
>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>> someone will be able to correct  a set of understanding that a moron like
>>>>> me has gained after long hours of not having anything better to do.
>>>>>
>>>>>
>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>>>> with replication 2 and there is a HADOOP cluster of three nodes. All these
>>>>> nodes have SPARK workers (executors) running in them.  Both are stored in
>>>>> the following way:
>>>>> -----------------------------------------------------
>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>> | (master)     |                     |                    |
>>>>> -----------------------------------------------------
>>>>> | file1.csv      |                     | file1.csv     |
>>>>> -----------------------------------------------------
>>>>> |                    |  file2.csv      | file2.csv     |
>>>>> -----------------------------------------------------
>>>>> | file3.csv      |  file3.csv      |                   |
>>>>> -----------------------------------------------------
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>> HDFS replication does not store the same file in all the nodes in the
>>>>> cluster. So if I have three nodes and the replication is two then the same
>>>>> file will be stored physically in two nodes in the cluster. Does that sound
>>>>> right?
>>>>>
>>>>>
>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is
>>>>> > 128 then it will be broken up into blocks
>>>>>
>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>
>>>>> and each block will be replicated. With replication = 2 there will be
>>>>> two copies of each block, but the file itself can span > 2 hosts.
>>>>>
>>>>>
>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>> If SPARK is trying to process to the records then I am expecting that
>>>>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>>>>> not be processing file2.csv and WORKER3 should not be processing file3.csv.
>>>>> Because in case WORKER2 was trying to process file1.csv then it will
>>>>> actually causing network transmission of the file unnecessarily.
>>>>>
>>>>>
>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>> traffic, but it schedules for execution time over waiting for workers free
>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is only
>>>>> a free thread on node 1, then node 1 gets the work
>>>>>
>>>>> There's details on whether/how work across blocks takes place which
>>>>> I'm avoiding. For now know those formats which are "splittable" will have
>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>> compress with snappy, it will be split. This gives you maximum performance
>>>>> as >1 thread can work on different blocks. That is, if file1 is split into
>>>>> three blocks, three worker threads can process it.
>>>>>
>>>>>
>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
>>>>> THIS):
>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>> just ask the workers to process the files which are avialable in the worker
>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>> file2.csv will not be processed at all.
>>>>>
>>>>>
>>>>> locality is best-effort, not guaranteed.
>>>>>
>>>>>
>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
>>>>> EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
>>>>> take more than 13 mins to set up the cluster and run the code).
>>>>>
>>>>> Once you execute the code then you will find that:
>>>>> 1.  if the path starts with file:/// while reading back then there is
>>>>> no error reported, but the number of records reported back are only those
>>>>> records in the worker which also has the server.
>>>>> 2. also you will notice that once you cache the file before writing
>>>>> the partitions are ditributed nicely across the workers, and while writing
>>>>> back, the dataframe partitions does write properly to the worker node in
>>>>> the Master, but the workers in the other system have the files written in
>>>>> _temporary folder which does not get copied back to the main folder.
>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>
>>>>>
>>>>> This gets into the "commit protocol". You don't want to know all the
>>>>> dirty details (*) but essentially its this
>>>>>
>>>>> 1. Every worker writes its output to a directory under the destination
>>>>> directory, something like '$dest/_temporary/$appAtt
>>>>> emptId/_temporary/$taskAttemptID'
>>>>> 2. it is the spark driver which "commits" the job by moving the output
>>>>> from the individual workers from the temporary directories into $dest, then
>>>>> deleting $dest/_temporary
>>>>> 3. For which it needs to be able to list all the output in
>>>>> $dest/_temporary
>>>>>
>>>>> In your case, only the output on the same node of the driver is being
>>>>> committed, because only those files can be listed and moved. The output on
>>>>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>>>
>>>>>
>>>>>
>>>>> Now in my own world, if I see, the following things are happening,
>>>>> something is going wrong (with me):
>>>>> 1. SPARK transfers files from different systems to process, instead of
>>>>> processing them locally (I do not have code to prove this, and therefore
>>>>> its just an assumption)
>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>> clusters workers and reports success (code is there for this)
>>>>> 3. SPARK reports back number of records in the worker running in the
>>>>> master node when count() is given without reporting an error while using
>>>>> file:/// and reports an error when I mention the path without file:///
>>>>> (for SPARK 2.1.x onwards, code is there for this)
>>>>>
>>>>>
>>>>>
>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>> uniform paths everywhere. That's needed to list the files to process, read
>>>>> the files in the workers and commit the final output. NFS cross-mounting is
>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>> overkill: more services to keep running, no real fault tolerance. Export a
>>>>> directory tree from one of the servers, give the rest access to it, don't
>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>> bottleneck
>>>>>
>>>>>
>>>>>
>>>>> I very sincerely hope with your genuine help the bar of language and
>>>>> social skills will be lowered for me. And everyone will find a way to
>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>> just focus on the facts related to machines, data, error and (the language
>>>>> that I somehow understand better) code.
>>>>>
>>>>>
>>>>> My sincere apologies once again, as I am 100% sure that I did not meet
>>>>> the required social and language skills.
>>>>>
>>>>> Thanks a ton once again for your kindness, patience and understanding.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>>
>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>
>>>>> Like I said: you don't want to know the details, and you really don't
>>>>> want to step through Hadoop's FileOutputCommitter to see what's going on.
>>>>> The Spark side is much easier to follow.
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: SPARK Issue in Standalone cluster

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Marco,

I am sincerely obliged for your kind time and response. Can you please try
the solution that you have so kindly suggested?

It will be a lot of help if you could kindly execute the code that I have
given. I dont think that anyone has yet.

There are lots of fine responses to my question here, but if you read the
last response from Simon, it comes the closest to being satisfactory. I am
sure even he did not execute the code, but at least he came quite close to
understanding what the problem is.


Regards,
Gourav Sengupta


On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mm...@gmail.com> wrote:

> Hello
>  my 2 cents here, hope it helps
> If you want to just to play around with Spark, i'd leave Hadoop out, it's
> an unnecessary dependency that you dont need for just running a python
> script
> Instead do the following:
> - got to the root of our master / slave node. create a directory
> /root/pyscripts
> - place your csv file there as well as the python script
> - run the script to replicate the whole directory  across the cluster (i
> believe it's called copy-script.sh)
> - then run your spark-submit , it will be something lke
>     ./spark-submit /root/pyscripts/mysparkscripts.py
> file:///root/pyscripts/tree_addhealth.csv 10 --master
> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
> - in your python script, as part of your processing, write the parquet
> file in directory /root/pyscripts
>
> If you have an AWS account and you are versatile with that - you need to
> setup bucket permissions etc - , you can just
> - store your file in one of your S3 bucket
> - create an EMR cluster
> - connect to master or slave
> - run your  scritp that reads from the s3 bucket and write to the same s3
> bucket
>
>
> Feel free to mail me privately, i have a working script i have used to
> test some code on spark standalone cluster
> hth
>
>
>
>
>
>
>
>
>
>
> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi Steve,
>>
>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>
>> I am now going through the documentation (https://github.com/stevelough
>> ran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-
>> tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_
>> committer_architecture.md) and it makes much much more sense now.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <st...@hortonworks.com>
>> wrote:
>>
>>>
>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>
>>> wrote:
>>>
>>> Hi Steve,
>>>
>>> I have written a sincere note of apology to everyone in a separate
>>> email. I sincerely request your kind forgiveness before hand if anything
>>> does sound impolite in my emails, in advance.
>>>
>>> Let me first start by thanking you.
>>>
>>> I know it looks like I formed all my opinion based on that document, but
>>> that is not the case at all. If you or anyone tries to execute the code
>>> that I have given then they will see what I mean. Code speaks louder and
>>> better than words for me.
>>>
>>> So I am not saying you are wrong. I am asking verify and expecting
>>> someone will be able to correct  a set of understanding that a moron like
>>> me has gained after long hours of not having anything better to do.
>>>
>>>
>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>> with replication 2 and there is a HADOOP cluster of three nodes. All these
>>> nodes have SPARK workers (executors) running in them.  Both are stored in
>>> the following way:
>>> -----------------------------------------------------
>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>> | (master)     |                     |                    |
>>> -----------------------------------------------------
>>> | file1.csv      |                     | file1.csv     |
>>> -----------------------------------------------------
>>> |                    |  file2.csv      | file2.csv     |
>>> -----------------------------------------------------
>>> | file3.csv      |  file3.csv      |                   |
>>> -----------------------------------------------------
>>>
>>>
>>>
>>>
>>>
>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>> HDFS replication does not store the same file in all the nodes in the
>>> cluster. So if I have three nodes and the replication is two then the same
>>> file will be stored physically in two nodes in the cluster. Does that sound
>>> right?
>>>
>>>
>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
>>> 128 then it will be broken up into blocks
>>>
>>> file1.cvs -> [block0001, block002, block0003]
>>>
>>> and each block will be replicated. With replication = 2 there will be
>>> two copies of each block, but the file itself can span > 2 hosts.
>>>
>>>
>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>> If SPARK is trying to process to the records then I am expecting that
>>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>>> not be processing file2.csv and WORKER3 should not be processing file3.csv.
>>> Because in case WORKER2 was trying to process file1.csv then it will
>>> actually causing network transmission of the file unnecessarily.
>>>
>>>
>>> Spark prefers to schedule work locally, so as to save on network
>>> traffic, but it schedules for execution time over waiting for workers free
>>> on the node with the data. IF a block is on nodes 2 and 3 but there is only
>>> a free thread on node 1, then node 1 gets the work
>>>
>>> There's details on whether/how work across blocks takes place which I'm
>>> avoiding. For now know those formats which are "splittable" will have work
>>> scheduled by block. If you use Parquet/ORC/avro for your data and compress
>>> with snappy, it will be split. This gives you maximum performance as >1
>>> thread can work on different blocks. That is, if file1 is split into three
>>> blocks, three worker threads can process it.
>>>
>>>
>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
>>> THIS):
>>> if WORKER 2 is not processing file1.csv then how does it matter whether
>>> the file is there or not at all in the system? Should not SPARK just ask
>>> the workers to process the files which are avialable in the worker nodes?
>>> In case both WORKER2 and WORKER3 fails and are not available then file2.csv
>>> will not be processed at all.
>>>
>>>
>>> locality is best-effort, not guaranteed.
>>>
>>>
>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
>>> EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
>>> take more than 13 mins to set up the cluster and run the code).
>>>
>>> Once you execute the code then you will find that:
>>> 1.  if the path starts with file:/// while reading back then there is
>>> no error reported, but the number of records reported back are only those
>>> records in the worker which also has the server.
>>> 2. also you will notice that once you cache the file before writing the
>>> partitions are ditributed nicely across the workers, and while writing
>>> back, the dataframe partitions does write properly to the worker node in
>>> the Master, but the workers in the other system have the files written in
>>> _temporary folder which does not get copied back to the main folder.
>>> Inspite of this the job is not reported as failed in SPARK.
>>>
>>>
>>> This gets into the "commit protocol". You don't want to know all the
>>> dirty details (*) but essentially its this
>>>
>>> 1. Every worker writes its output to a directory under the destination
>>> directory, something like '$dest/_temporary/$appAtt
>>> emptId/_temporary/$taskAttemptID'
>>> 2. it is the spark driver which "commits" the job by moving the output
>>> from the individual workers from the temporary directories into $dest, then
>>> deleting $dest/_temporary
>>> 3. For which it needs to be able to list all the output in
>>> $dest/_temporary
>>>
>>> In your case, only the output on the same node of the driver is being
>>> committed, because only those files can be listed and moved. The output on
>>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>>
>>>
>>>
>>> Now in my own world, if I see, the following things are happening,
>>> something is going wrong (with me):
>>> 1. SPARK transfers files from different systems to process, instead of
>>> processing them locally (I do not have code to prove this, and therefore
>>> its just an assumption)
>>> 2. SPARK cannot determine when the writes are failing in standalone
>>> clusters workers and reports success (code is there for this)
>>> 3. SPARK reports back number of records in the worker running in the
>>> master node when count() is given without reporting an error while using
>>> file:/// and reports an error when I mention the path without file:///
>>> (for SPARK 2.1.x onwards, code is there for this)
>>>
>>>
>>>
>>> s everyone's been saying, file:// requires a shared filestore, with
>>> uniform paths everywhere. That's needed to list the files to process, read
>>> the files in the workers and commit the final output. NFS cross-mounting is
>>> the simplest way to do this, especially as for three nodes HDFS is
>>> overkill: more services to keep running, no real fault tolerance. Export a
>>> directory tree from one of the servers, give the rest access to it, don't
>>> worry about bandwidth use as the shared disk itself will become the
>>> bottleneck
>>>
>>>
>>>
>>> I very sincerely hope with your genuine help the bar of language and
>>> social skills will be lowered for me. And everyone will find a way to
>>> excuse me and not qualify this email as a means to measure my extremely
>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>> just focus on the facts related to machines, data, error and (the language
>>> that I somehow understand better) code.
>>>
>>>
>>> My sincere apologies once again, as I am 100% sure that I did not meet
>>> the required social and language skills.
>>>
>>> Thanks a ton once again for your kindness, patience and understanding.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>> * for the curious, the details of the v1 and v2 commit protocols are
>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>
>>> Like I said: you don't want to know the details, and you really don't
>>> want to step through Hadoop's FileOutputCommitter to see what's going on.
>>> The Spark side is much easier to follow.
>>>
>>>
>>
>

Re: SPARK Issue in Standalone cluster

Posted by Jean Georges Perrin <jg...@jgp.net>.
I use CIFS and it works reasonably well and easily cross platform, well documented...

> On Aug 4, 2017, at 6:50 AM, Steve Loughran <st...@hortonworks.com> wrote:
> 
> 
>> On 3 Aug 2017, at 19:59, Marco Mistroni <mm...@gmail.com> wrote:
>> 
>> Hello
>> my 2 cents here, hope it helps
>> If you want to just to play around with Spark, i'd leave Hadoop out, it's an unnecessary dependency that you dont need for just running a python script
>> Instead do the following:
>> - got to the root of our master / slave node. create a directory /root/pyscripts 
>> - place your csv file there as well as the python script
>> - run the script to replicate the whole directory  across the cluster (i believe it's called copy-script.sh)
>> - then run your spark-submit , it will be something lke
>>    ./spark-submit /root/pyscripts/mysparkscripts.py  file:///root/pyscripts/tree_addhealth.csv 10 --master spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>> - in your python script, as part of your processing, write the parquet file in directory /root/pyscripts 
>> 
> 
> That's going to hit the commit problem discussed: only the spark driver executes the final commit process; the output from the other servers doesn't get picked up and promoted. You need a shared stpre (NFS is the easy one)
> 
> 
>> If you have an AWS account and you are versatile with that - you need to setup bucket permissions etc - , you can just
>> - store your file in one of your S3 bucket
>> - create an EMR cluster
>> - connect to master or slave
>> - run your  scritp that reads from the s3 bucket and write to the same s3 bucket
> 
> 
> Aah, and now we are into the problem of implementing a safe commit protocol for an inconsistent filesystem....
> 
> My current stance there is out-the-box S3 isn't safe to use as the direct output of work, Azure is. It mostly works for a small experiment, but I wouldn't use it in production.
> 
> Simplest: work on one machine, if you go to 2-3 for exploratory work: NFS
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: SPARK Issue in Standalone cluster

Posted by Steve Loughran <st...@hortonworks.com>.
> On 3 Aug 2017, at 19:59, Marco Mistroni <mm...@gmail.com> wrote:
> 
> Hello
>  my 2 cents here, hope it helps
> If you want to just to play around with Spark, i'd leave Hadoop out, it's an unnecessary dependency that you dont need for just running a python script
> Instead do the following:
> - got to the root of our master / slave node. create a directory /root/pyscripts 
> - place your csv file there as well as the python script
> - run the script to replicate the whole directory  across the cluster (i believe it's called copy-script.sh)
> - then run your spark-submit , it will be something lke
>     ./spark-submit /root/pyscripts/mysparkscripts.py  file:///root/pyscripts/tree_addhealth.csv 10 --master spark://ip-172-31-44-155.us-west-2.compute.internal:7077
> - in your python script, as part of your processing, write the parquet file in directory /root/pyscripts 
> 

That's going to hit the commit problem discussed: only the spark driver executes the final commit process; the output from the other servers doesn't get picked up and promoted. You need a shared stpre (NFS is the easy one)


> If you have an AWS account and you are versatile with that - you need to setup bucket permissions etc - , you can just
> - store your file in one of your S3 bucket
> - create an EMR cluster
> - connect to master or slave
> - run your  scritp that reads from the s3 bucket and write to the same s3 bucket


Aah, and now we are into the problem of implementing a safe commit protocol for an inconsistent filesystem....

My current stance there is out-the-box S3 isn't safe to use as the direct output of work, Azure is. It mostly works for a small experiment, but I wouldn't use it in production.

Simplest: work on one machine, if you go to 2-3 for exploratory work: NFS


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: SPARK Issue in Standalone cluster

Posted by Marco Mistroni <mm...@gmail.com>.
Hello
 my 2 cents here, hope it helps
If you want to just to play around with Spark, i'd leave Hadoop out, it's
an unnecessary dependency that you dont need for just running a python
script
Instead do the following:
- got to the root of our master / slave node. create a directory
/root/pyscripts
- place your csv file there as well as the python script
- run the script to replicate the whole directory  across the cluster (i
believe it's called copy-script.sh)
- then run your spark-submit , it will be something lke
    ./spark-submit /root/pyscripts/mysparkscripts.py
file:///root/pyscripts/tree_addhealth.csv 10 --master
spark://ip-172-31-44-155.us-west-2.compute.internal:7077
- in your python script, as part of your processing, write the parquet file
in directory /root/pyscripts

If you have an AWS account and you are versatile with that - you need to
setup bucket permissions etc - , you can just
- store your file in one of your S3 bucket
- create an EMR cluster
- connect to master or slave
- run your  scritp that reads from the s3 bucket and write to the same s3
bucket


Feel free to mail me privately, i have a working script i have used to test
some code on spark standalone cluster
hth










On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi Steve,
>
> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>
> I am now going through the documentation (https://github.com/
> steveloughran/hadoop/blob/s3guard/HADOOP-13786-
> committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/
> hadoop-aws/s3a_committer_architecture.md) and it makes much much more
> sense now.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <st...@hortonworks.com>
> wrote:
>
>>
>> On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>
>> wrote:
>>
>> Hi Steve,
>>
>> I have written a sincere note of apology to everyone in a separate email.
>> I sincerely request your kind forgiveness before hand if anything does
>> sound impolite in my emails, in advance.
>>
>> Let me first start by thanking you.
>>
>> I know it looks like I formed all my opinion based on that document, but
>> that is not the case at all. If you or anyone tries to execute the code
>> that I have given then they will see what I mean. Code speaks louder and
>> better than words for me.
>>
>> So I am not saying you are wrong. I am asking verify and expecting
>> someone will be able to correct  a set of understanding that a moron like
>> me has gained after long hours of not having anything better to do.
>>
>>
>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
>> replication 2 and there is a HADOOP cluster of three nodes. All these nodes
>> have SPARK workers (executors) running in them.  Both are stored in the
>> following way:
>> -----------------------------------------------------
>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>> | (worker1)   |  (worker2)    |  (worker3)   |
>> | (master)     |                     |                    |
>> -----------------------------------------------------
>> | file1.csv      |                     | file1.csv     |
>> -----------------------------------------------------
>> |                    |  file2.csv      | file2.csv     |
>> -----------------------------------------------------
>> | file3.csv      |  file3.csv      |                   |
>> -----------------------------------------------------
>>
>>
>>
>>
>>
>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>> HDFS replication does not store the same file in all the nodes in the
>> cluster. So if I have three nodes and the replication is two then the same
>> file will be stored physically in two nodes in the cluster. Does that sound
>> right?
>>
>>
>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
>> 128 then it will be broken up into blocks
>>
>> file1.cvs -> [block0001, block002, block0003]
>>
>> and each block will be replicated. With replication = 2 there will be two
>> copies of each block, but the file itself can span > 2 hosts.
>>
>>
>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>> If SPARK is trying to process to the records then I am expecting that
>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>> not be processing file2.csv and WORKER3 should not be processing file3.csv.
>> Because in case WORKER2 was trying to process file1.csv then it will
>> actually causing network transmission of the file unnecessarily.
>>
>>
>> Spark prefers to schedule work locally, so as to save on network traffic,
>> but it schedules for execution time over waiting for workers free on the
>> node with the data. IF a block is on nodes 2 and 3 but there is only a free
>> thread on node 1, then node 1 gets the work
>>
>> There's details on whether/how work across blocks takes place which I'm
>> avoiding. For now know those formats which are "splittable" will have work
>> scheduled by block. If you use Parquet/ORC/avro for your data and compress
>> with snappy, it will be split. This gives you maximum performance as >1
>> thread can work on different blocks. That is, if file1 is split into three
>> blocks, three worker threads can process it.
>>
>>
>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
>> THIS):
>> if WORKER 2 is not processing file1.csv then how does it matter whether
>> the file is there or not at all in the system? Should not SPARK just ask
>> the workers to process the files which are avialable in the worker nodes?
>> In case both WORKER2 and WORKER3 fails and are not available then file2.csv
>> will not be processed at all.
>>
>>
>> locality is best-effort, not guaranteed.
>>
>>
>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
>> EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
>> take more than 13 mins to set up the cluster and run the code).
>>
>> Once you execute the code then you will find that:
>> 1.  if the path starts with file:/// while reading back then there is no
>> error reported, but the number of records reported back are only those
>> records in the worker which also has the server.
>> 2. also you will notice that once you cache the file before writing the
>> partitions are ditributed nicely across the workers, and while writing
>> back, the dataframe partitions does write properly to the worker node in
>> the Master, but the workers in the other system have the files written in
>> _temporary folder which does not get copied back to the main folder.
>> Inspite of this the job is not reported as failed in SPARK.
>>
>>
>> This gets into the "commit protocol". You don't want to know all the
>> dirty details (*) but essentially its this
>>
>> 1. Every worker writes its output to a directory under the destination
>> directory, something like '$dest/_temporary/$appAtt
>> emptId/_temporary/$taskAttemptID'
>> 2. it is the spark driver which "commits" the job by moving the output
>> from the individual workers from the temporary directories into $dest, then
>> deleting $dest/_temporary
>> 3. For which it needs to be able to list all the output in
>> $dest/_temporary
>>
>> In your case, only the output on the same node of the driver is being
>> committed, because only those files can be listed and moved. The output on
>> the other nodes isn't seen, so isn't committed, nor cleaned up.
>>
>>
>>
>> Now in my own world, if I see, the following things are happening,
>> something is going wrong (with me):
>> 1. SPARK transfers files from different systems to process, instead of
>> processing them locally (I do not have code to prove this, and therefore
>> its just an assumption)
>> 2. SPARK cannot determine when the writes are failing in standalone
>> clusters workers and reports success (code is there for this)
>> 3. SPARK reports back number of records in the worker running in the
>> master node when count() is given without reporting an error while using
>> file:/// and reports an error when I mention the path without file:///
>> (for SPARK 2.1.x onwards, code is there for this)
>>
>>
>>
>> s everyone's been saying, file:// requires a shared filestore, with
>> uniform paths everywhere. That's needed to list the files to process, read
>> the files in the workers and commit the final output. NFS cross-mounting is
>> the simplest way to do this, especially as for three nodes HDFS is
>> overkill: more services to keep running, no real fault tolerance. Export a
>> directory tree from one of the servers, give the rest access to it, don't
>> worry about bandwidth use as the shared disk itself will become the
>> bottleneck
>>
>>
>>
>> I very sincerely hope with your genuine help the bar of language and
>> social skills will be lowered for me. And everyone will find a way to
>> excuse me and not qualify this email as a means to measure my extremely
>> versatile and amazingly vivid social skills. It will be a lot of help to
>> just focus on the facts related to machines, data, error and (the language
>> that I somehow understand better) code.
>>
>>
>> My sincere apologies once again, as I am 100% sure that I did not meet
>> the required social and language skills.
>>
>> Thanks a ton once again for your kindness, patience and understanding.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> * for the curious, the details of the v1 and v2 commit protocols are
>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/
>> tools/hadoop-aws/s3a_committer_architecture.md
>>
>> Like I said: you don't want to know the details, and you really don't
>> want to step through Hadoop's FileOutputCommitter to see what's going on.
>> The Spark side is much easier to follow.
>>
>>
>

Re: SPARK Issue in Standalone cluster

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Steve,

I love you mate, thanks a ton once again for ACTUALLY RESPONDING.

I am now going through the documentation (
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md)
and it makes much much more sense now.

Regards,
Gourav Sengupta

On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <st...@hortonworks.com>
wrote:

>
> On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>
> wrote:
>
> Hi Steve,
>
> I have written a sincere note of apology to everyone in a separate email.
> I sincerely request your kind forgiveness before hand if anything does
> sound impolite in my emails, in advance.
>
> Let me first start by thanking you.
>
> I know it looks like I formed all my opinion based on that document, but
> that is not the case at all. If you or anyone tries to execute the code
> that I have given then they will see what I mean. Code speaks louder and
> better than words for me.
>
> So I am not saying you are wrong. I am asking verify and expecting someone
> will be able to correct  a set of understanding that a moron like me has
> gained after long hours of not having anything better to do.
>
>
> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
> replication 2 and there is a HADOOP cluster of three nodes. All these nodes
> have SPARK workers (executors) running in them.  Both are stored in the
> following way:
> -----------------------------------------------------
> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
> | (worker1)   |  (worker2)    |  (worker3)   |
> | (master)     |                     |                    |
> -----------------------------------------------------
> | file1.csv      |                     | file1.csv     |
> -----------------------------------------------------
> |                    |  file2.csv      | file2.csv     |
> -----------------------------------------------------
> | file3.csv      |  file3.csv      |                   |
> -----------------------------------------------------
>
>
>
>
>
> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
> HDFS replication does not store the same file in all the nodes in the
> cluster. So if I have three nodes and the replication is two then the same
> file will be stored physically in two nodes in the cluster. Does that sound
> right?
>
>
> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
> 128 then it will be broken up into blocks
>
> file1.cvs -> [block0001, block002, block0003]
>
> and each block will be replicated. With replication = 2 there will be two
> copies of each block, but the file itself can span > 2 hosts.
>
>
> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
> If SPARK is trying to process to the records then I am expecting that
> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
> not be processing file2.csv and WORKER3 should not be processing file3.csv.
> Because in case WORKER2 was trying to process file1.csv then it will
> actually causing network transmission of the file unnecessarily.
>
>
> Spark prefers to schedule work locally, so as to save on network traffic,
> but it schedules for execution time over waiting for workers free on the
> node with the data. IF a block is on nodes 2 and 3 but there is only a free
> thread on node 1, then node 1 gets the work
>
> There's details on whether/how work across blocks takes place which I'm
> avoiding. For now know those formats which are "splittable" will have work
> scheduled by block. If you use Parquet/ORC/avro for your data and compress
> with snappy, it will be split. This gives you maximum performance as >1
> thread can work on different blocks. That is, if file1 is split into three
> blocks, three worker threads can process it.
>
>
> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
> THIS):
> if WORKER 2 is not processing file1.csv then how does it matter whether
> the file is there or not at all in the system? Should not SPARK just ask
> the workers to process the files which are avialable in the worker nodes?
> In case both WORKER2 and WORKER3 fails and are not available then file2.csv
> will not be processed at all.
>
>
> locality is best-effort, not guaranteed.
>
>
> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
> EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
> take more than 13 mins to set up the cluster and run the code).
>
> Once you execute the code then you will find that:
> 1.  if the path starts with file:/// while reading back then there is no
> error reported, but the number of records reported back are only those
> records in the worker which also has the server.
> 2. also you will notice that once you cache the file before writing the
> partitions are ditributed nicely across the workers, and while writing
> back, the dataframe partitions does write properly to the worker node in
> the Master, but the workers in the other system have the files written in
> _temporary folder which does not get copied back to the main folder.
> Inspite of this the job is not reported as failed in SPARK.
>
>
> This gets into the "commit protocol". You don't want to know all the dirty
> details (*) but essentially its this
>
> 1. Every worker writes its output to a directory under the destination
> directory, something like '$dest/_temporary/$appAttemptId/_temporary/$
> taskAttemptID'
> 2. it is the spark driver which "commits" the job by moving the output
> from the individual workers from the temporary directories into $dest, then
> deleting $dest/_temporary
> 3. For which it needs to be able to list all the output in $dest/_temporary
>
> In your case, only the output on the same node of the driver is being
> committed, because only those files can be listed and moved. The output on
> the other nodes isn't seen, so isn't committed, nor cleaned up.
>
>
>
> Now in my own world, if I see, the following things are happening,
> something is going wrong (with me):
> 1. SPARK transfers files from different systems to process, instead of
> processing them locally (I do not have code to prove this, and therefore
> its just an assumption)
> 2. SPARK cannot determine when the writes are failing in standalone
> clusters workers and reports success (code is there for this)
> 3. SPARK reports back number of records in the worker running in the
> master node when count() is given without reporting an error while using
> file:/// and reports an error when I mention the path without file:///
> (for SPARK 2.1.x onwards, code is there for this)
>
>
>
> s everyone's been saying, file:// requires a shared filestore, with
> uniform paths everywhere. That's needed to list the files to process, read
> the files in the workers and commit the final output. NFS cross-mounting is
> the simplest way to do this, especially as for three nodes HDFS is
> overkill: more services to keep running, no real fault tolerance. Export a
> directory tree from one of the servers, give the rest access to it, don't
> worry about bandwidth use as the shared disk itself will become the
> bottleneck
>
>
>
> I very sincerely hope with your genuine help the bar of language and
> social skills will be lowered for me. And everyone will find a way to
> excuse me and not qualify this email as a means to measure my extremely
> versatile and amazingly vivid social skills. It will be a lot of help to
> just focus on the facts related to machines, data, error and (the language
> that I somehow understand better) code.
>
>
> My sincere apologies once again, as I am 100% sure that I did not meet the
> required social and language skills.
>
> Thanks a ton once again for your kindness, patience and understanding.
>
>
> Regards,
> Gourav Sengupta
>
>
> * for the curious, the details of the v1 and v2 commit protocols are
> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-
> committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/
> hadoop-aws/s3a_committer_architecture.md
>
> Like I said: you don't want to know the details, and you really don't want
> to step through Hadoop's FileOutputCommitter to see what's going on. The
> Spark side is much easier to follow.
>
>

Re: SPARK Issue in Standalone cluster

Posted by Steve Loughran <st...@hortonworks.com>.
On 2 Aug 2017, at 20:05, Gourav Sengupta <go...@gmail.com>> wrote:

Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I sincerely request your kind forgiveness before hand if anything does sound impolite in my emails, in advance.

Let me first start by thanking you.

I know it looks like I formed all my opinion based on that document, but that is not the case at all. If you or anyone tries to execute the code that I have given then they will see what I mean. Code speaks louder and better than words for me.

So I am not saying you are wrong. I am asking verify and expecting someone will be able to correct  a set of understanding that a moron like me has gained after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with replication 2 and there is a HADOOP cluster of three nodes. All these nodes have SPARK workers (executors) running in them.  Both are stored in the following way:
-----------------------------------------------------
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)    |  (worker3)   |
| (master)     |                     |                    |
-----------------------------------------------------
| file1.csv      |                     | file1.csv     |
-----------------------------------------------------
|                    |  file2.csv      | file2.csv     |
-----------------------------------------------------
| file3.csv      |  file3.csv      |                   |
-----------------------------------------------------





CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the cluster. So if I have three nodes and the replication is two then the same file will be stored physically in two nodes in the cluster. Does that sound right?


HDFS breaks files up into blocks (default = 128MB). If a .csv file is > 128 then it will be broken up into blocks

file1.cvs -> [block0001, block002, block0003]

and each block will be replicated. With replication = 2 there will be two copies of each block, but the file itself can span > 2 hosts.


ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that WORKER2 should not be processing file1.csv, and similary WORKER 1 should not be processing file2.csv and WORKER3 should not be processing file3.csv. Because in case WORKER2 was trying to process file1.csv then it will actually causing network transmission of the file unnecessarily.


Spark prefers to schedule work locally, so as to save on network traffic, but it schedules for execution time over waiting for workers free on the node with the data. IF a block is on nodes 2 and 3 but there is only a free thread on node 1, then node 1 gets the work

There's details on whether/how work across blocks takes place which I'm avoiding. For now know those formats which are "splittable" will have work scheduled by block. If you use Parquet/ORC/avro for your data and compress with snappy, it will be split. This gives you maximum performance as >1 thread can work on different blocks. That is, if file1 is split into three blocks, three worker threads can process it.


ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the file is there or not at all in the system? Should not SPARK just ask the workers to process the files which are avialable in the worker nodes? In case both WORKER2 and WORKER3 fails and are not available then file2.csv will not be processed at all.


locality is best-effort, not guaranteed.


ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE EXECUTED (Its been pointed out that I am learning SPARK, and even I did not take more than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no error reported, but the number of records reported back are only those records in the worker which also has the server.
2. also you will notice that once you cache the file before writing the partitions are ditributed nicely across the workers, and while writing back, the dataframe partitions does write properly to the worker node in the Master, but the workers in the other system have the files written in _temporary folder which does not get copied back to the main folder. Inspite of this the job is not reported as failed in SPARK.

This gets into the "commit protocol". You don't want to know all the dirty details (*) but essentially its this

1. Every worker writes its output to a directory under the destination directory, something like '$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID'
2. it is the spark driver which "commits" the job by moving the output from the individual workers from the temporary directories into $dest, then deleting $dest/_temporary
3. For which it needs to be able to list all the output in $dest/_temporary

In your case, only the output on the same node of the driver is being committed, because only those files can be listed and moved. The output on the other nodes isn't seen, so isn't committed, nor cleaned up.



Now in my own world, if I see, the following things are happening, something is going wrong (with me):
1. SPARK transfers files from different systems to process, instead of processing them locally (I do not have code to prove this, and therefore its just an assumption)
2. SPARK cannot determine when the writes are failing in standalone clusters workers and reports success (code is there for this)
3. SPARK reports back number of records in the worker running in the master node when count() is given without reporting an error while using file:/// and reports an error when I mention the path without file:/// (for SPARK 2.1.x onwards, code is there for this)


s everyone's been saying, file:// requires a shared filestore, with uniform paths everywhere. That's needed to list the files to process, read the files in the workers and commit the final output. NFS cross-mounting is the simplest way to do this, especially as for three nodes HDFS is overkill: more services to keep running, no real fault tolerance. Export a directory tree from one of the servers, give the rest access to it, don't worry about bandwidth use as the shared disk itself will become the bottleneck



I very sincerely hope with your genuine help the bar of language and social skills will be lowered for me. And everyone will find a way to excuse me and not qualify this email as a means to measure my extremely versatile and amazingly vivid social skills. It will be a lot of help to just focus on the facts related to machines, data, error and (the language that I somehow understand better) code.


My sincere apologies once again, as I am 100% sure that I did not meet the required social and language skills.

Thanks a ton once again for your kindness, patience and understanding.


Regards,
Gourav Sengupta


* for the curious, the details of the v1 and v2 commit protocols are
https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md

Like I said: you don't want to know the details, and you really don't want to step through Hadoop's FileOutputCommitter to see what's going on. The Spark side is much easier to follow.


Re: SPARK Issue in Standalone cluster

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I
sincerely request your kind forgiveness before hand if anything does sound
impolite in my emails, in advance.

Let me first start by thanking you.

I know it looks like I formed all my opinion based on that document, but
that is not the case at all. If you or anyone tries to execute the code
that I have given then they will see what I mean. Code speaks louder and
better than words for me.

So I am not saying you are wrong. I am asking verify and expecting someone
will be able to correct  a set of understanding that a moron like me has
gained after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
replication 2 and there is a HADOOP cluster of three nodes. All these nodes
have SPARK workers (executors) running in them.  Both are stored in the
following way:
-----------------------------------------------------
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)    |  (worker3)   |
| (master)     |                     |                    |
-----------------------------------------------------
| file1.csv      |                     | file1.csv     |
-----------------------------------------------------
|                    |  file2.csv      | file2.csv     |
-----------------------------------------------------
| file3.csv      |  file3.csv      |                   |
-----------------------------------------------------

CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the
cluster. So if I have three nodes and the replication is two then the same
file will be stored physically in two nodes in the cluster. Does that sound
right?

ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that
WORKER2 should not be processing file1.csv, and similary WORKER 1 should
not be processing file2.csv and WORKER3 should not be processing file3.csv.
Because in case WORKER2 was trying to process file1.csv then it will
actually causing network transmission of the file unnecessarily.

ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the
file is there or not at all in the system? Should not SPARK just ask the
workers to process the files which are avialable in the worker nodes? In
case both WORKER2 and WORKER3 fails and are not available then file2.csv
will not be processed at all.

ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
take more than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no
error reported, but the number of records reported back are only those
records in the worker which also has the server.
2. also you will notice that once you cache the file before writing the
partitions are ditributed nicely across the workers, and while writing
back, the dataframe partitions does write properly to the worker node in
the Master, but the workers in the other system have the files written in
_temporary folder which does not get copied back to the main folder.
Inspite of this the job is not reported as failed in SPARK.

Now in my own world, if I see, the following things are happening,
something is going wrong (with me):
1. SPARK transfers files from different systems to process, instead of
processing them locally (I do not have code to prove this, and therefore
its just an assumption)
2. SPARK cannot determine when the writes are failing in standalone
clusters workers and reports success (code is there for this)
3. SPARK reports back number of records in the worker running in the master
node when count() is given without reporting an error while using file:///
and reports an error when I mention the path without file:/// (for SPARK
2.1.x onwards, code is there for this)


I very sincerely hope with your genuine help the bar of language and social
skills will be lowered for me. And everyone will find a way to excuse me
and not qualify this email as a means to measure my extremely versatile and
amazingly vivid social skills. It will be a lot of help to just focus on
the facts related to machines, data, error and (the language that I somehow
understand better) code.


My sincere apologies once again, as I am 100% sure that I did not meet the
required social and language skills.

Thanks a ton once again for your kindness, patience and understanding.


Regards,
Gourav Sengupta



On Wed, Aug 2, 2017 at 4:59 PM, Steve Loughran <st...@hortonworks.com>
wrote:

>
> On 2 Aug 2017, at 14:25, Gourav Sengupta <go...@gmail.com>
> wrote:
>
> Hi,
>
> I am definitely sure that at this point of time everyone who has kindly
> cared to respond to my query do need to go and check this link
> https://spark.apache.org/docs/2.2.0/spark-standalone.
> html#spark-standalone-mode.
>
>
> I see. Well, we shall have to edit that document to make clear something
> which had been omitted:
>
> *in order for multiple spark workers to process data, they must have a
> shared store for that data, one with read/write access for all workers.
> This is must be provided by a shared filesystem: HDFS, network-mounted NFS,
> Glusterfs, through an object store (S3, Azure WASB, ...), or through
> alternative datastores implementing the Hadoop Filesystem API (example:
> Apache Cassandra).*
>
> n your case, for a small cluster of 1-3 machines, especially if you are
> just learning to play with spark, I'd start with an NFS mounted disk
> accessible on the same path on all machines. If you aren't willing to set
> that up, stick to spark standalone on a single machine first. You don't
> need a shared cluster to use spark standalone.
>
> Personally, I'd recommend downloading apache zeppelin and running it
> locally as the simplest out-the-box experience.
>
>
> It does mention that SPARK standalone cluster can have multiple machines
> running as slaves.
>
>
> Clearly it omits the small detail about the requirement for a shared store.
>
> The general idea of writing to the user group is that people who know
> should answer, and not those who do not know.
>
>
> Agreed, but if the answer doesn't appear to be correct to you, do consider
> that there may be some detail that hasn't been mentioned, rather than
> immediately concluding that the person replying is wrong.
>
> -Steve
>
>
>
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker <
> mahesh_sawaiker@persistent.com> wrote:
>
>> Gourav,
>>
>> Riccardo’s answer is spot on.
>>
>> What is happening is one node of spark is writing to its own directory
>> and telling a slave to read the data from there, when the slave goes to
>> read it, the part is not found.
>>
>>
>>
>> Check the folder Users/gouravsengupta/Developme
>> nt/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-
>> 2e52f523dfdf-c000.snappy.parquet on the slave.
>>
>> The reason it ran on spark 1.5 may have been because the executor ran on
>> the driver itself. There is not much use to a set up where you don’t have
>> some kind of distributed file system, so I would encourage you to use hdfs,
>> or a mounted file system shared by all nodes.
>>
>>
>>
>> Regards,
>>
>> Mahesh
>>
>>
>>
>>
>>
>> *From:* Gourav Sengupta [mailto:gourav.sengupta@gmail.com]
>> *Sent:* Monday, July 31, 2017 9:54 PM
>> *To:* Riccardo Ferrari
>> *Cc:* user
>> *Subject:* Re: SPARK Issue in Standalone cluster
>>
>>
>>
>> Hi Riccardo,
>>
>>
>>
>> I am grateful for your kind response.
>>
>>
>>
>> Also I am sure that your answer is completely wrong and errorneous. SPARK
>> must be having a method so that different executors do not pick up the same
>> files to process. You also did not answer the question why was the
>> processing successful in SPARK 1.5 and not in SPARK 2.2.
>>
>>
>>
>> Also the exact same directory is is present across in both the nodes.
>>
>>
>>
>> I feel quite facinated when individuals respond before even understanding
>> the issue, or trying out the code.
>>
>>
>>
>> It will be of great help if someone could kindly read my email and help
>> me figure out the issue.
>>
>>
>>
>>
>>
>> Regards,
>>
>> Gourav Sengupta
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <fe...@gmail.com>
>> wrote:
>>
>> Hi Gourav,
>>
>>
>>
>> The issue here is the location where you're trying to write/read from :
>> /Users/gouravsengupta/Development/spark/sparkdata/test1/p...
>>
>> When dealing with clusters all the paths and resources should be
>> available to all executors (and driver), and that is reason why you
>> generally use HDFS, S3, NFS or any shared file system.
>>
>>
>>
>> Spark assumes your data is generally available to all nodes and does not
>> tries to pick up the data from a selected node, it rather tries to
>> write/read in parallel from the executor nodes. Also given its control
>> logic there is no way (read. you should not care) to know what executor is
>> doing what task.
>>
>>
>>
>> Hope it helps,
>>
>> Riccardo
>>
>>
>>
>> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>> Hi,
>>
>>
>>
>> I am working by creating a native SPARK standalone cluster (
>> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>>
>>
>>
>> Therefore I  do not have a HDFS.
>>
>>
>>
>>
>>
>> EXERCISE:
>>
>> Its the most fundamental and simple exercise. Create a sample SPARK
>> dataframe and then write it to a location and then read it back.
>>
>>
>>
>> SETTINGS:
>>
>> So after I have installed SPARK in two physical systems with the same:
>>
>> 1. SPARK version,
>>
>> 2. JAVA version,
>>
>> 3. PYTHON_PATH
>>
>> 4. SPARK_HOME
>>
>> 5. PYSPARK_PYTHON
>>
>> the user in both the systems is the root user therefore there are no
>> permission issues anywhere.
>>
>>
>>
>> I am able to start:
>>
>> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
>>
>> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
>> computers)
>>
>>
>>
>> After that I can see in the spark UI (at port 8080) two workers.
>>
>>
>>
>>
>>
>> CODE:
>>
>> Then I run the following code:
>>
>>
>>
>> ======================================================
>>
>> import findspark
>>
>> import os
>>
>> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Develop
>> ment/spark/spark/'
>>
>> findspark.init()
>>
>> import pyspark
>>
>> from pyspark.sql import SparkSession
>>
>> spark = (SparkSession.builder
>>
>>         .master("spark://mastersystem.local:7077")
>>
>>         .appName("gouravtest")
>>
>>         .enableHiveSupport()
>>
>>         .getOrCreate())
>>
>> import pandas, numpy
>>
>> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
>> 4), columns=list('ABCD')))
>>
>> testdf.cache()
>>
>> testdf.count()
>>
>> testdf.write.save("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2")
>>
>> spark.read.load("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2").count()
>>
>> ======================================================
>>
>>
>>
>>
>>
>> ERROR I (in above code):
>>
>> ERROR in line: testdf.write.save("/Users/gour
>> avsengupta/Development/spark/sparkdata/test2")
>>
>> This line does not fail or report any error. But when I am looking at the
>> stage in spark Application UI the error reported for one of the slave node
>> which is not in the same system as the master node is mentioned below. The
>> writing on the slave node which is in the same physical system as the
>> Master happens correctly. (NOTE: slave node basically the worker and master
>> node the driver)
>>
>> ------------------------------------------------------------
>> ----------------------------------------------------------------------
>>
>> 0 (TID 41). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0: Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0: Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0: Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0: Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0: Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0: Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0: Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0: Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0: Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0: Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0: Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes result sent to driver
>>
>> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
>>
>> 17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
>>
>> 17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
>>
>> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 24.9 KB, free 365.9 MB)
>>
>> 17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
>>
>> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 70.3 KB, free 365.9 MB)
>>
>> 17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
>>
>> java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet does not exist
>>
>>           at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>>
>>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>>
>>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>>
>>           at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>>
>>           at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
>>
>>           at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
>>
>>           at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>
>>           at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
>>
>>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
>>
>>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
>>
>>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
>>
>>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
>>
>>           at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
>>
>>           at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
>>
>>           at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>>
>>           at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>>
>>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
>>
>>           at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
>>
>>           at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
>>
>>           at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
>>
>>           at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>>
>>           at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>>
>>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>           at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>>           at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>>           at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66
>>
>>
>>
>> ------------------------------------------------------------
>> ----------------------------------------------------------------------
>>
>>
>>
>>
>>
>> ERROR II  (in above code):
>>
>> While trying to read the file there is now a distinct error thrown which
>> mentions the same saying that the files do not exist.
>>
>>
>>
>> Also why is SPARK trying to search for the same files in both the
>> systems? If the same path in two systems have different files should SPARK
>> not combine and work on them?
>>
>>
>>
>>
>>
>>
>>
>> NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
>>
>> I started spark using the same method but now using SPARK 1.5 and this
>> does not give any error:
>>
>> ======================================================
>>
>> import findspark
>>
>> import os
>>
>> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Develop
>> ment/spark/spark/'
>>
>> findspark.init()
>>
>> import pyspark
>>
>>
>>
>> sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
>>
>> sqlContext = pyspark.SQLContext(sc)
>>
>> import pandas, numpy
>>
>> testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
>> 4), columns=list('ABCD')))
>>
>> testdf.cache()
>>
>> testdf.count()
>>
>> testdf.write.save("/Users/gouravsengupta/Development/spark/
>> sparkdata/test3")
>>
>> spark.read.load("/Users/gouravsengupta/Development/spark/
>> sparkdata/test3").count()
>>
>> ======================================================
>>
>>
>>
>> I will be sincerely obliged if someone could kindly help me out with this
>> issue and point out my mistakes/ assumptions.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>>
>> Gourav Sengupta
>>
>>
>>
>>
>> DISCLAIMER
>> ==========
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
>
>
>

Re: SPARK Issue in Standalone cluster

Posted by Steve Loughran <st...@hortonworks.com>.
On 2 Aug 2017, at 14:25, Gourav Sengupta <go...@gmail.com>> wrote:

Hi,

I am definitely sure that at this point of time everyone who has kindly cared to respond to my query do need to go and check this link https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode.

I see. Well, we shall have to edit that document to make clear something which had been omitted:

in order for multiple spark workers to process data, they must have a shared store for that data, one with read/write access for all workers. This is must be provided by a shared filesystem: HDFS, network-mounted NFS, Glusterfs, through an object store (S3, Azure WASB, ...), or through alternative datastores implementing the Hadoop Filesystem API (example: Apache Cassandra).

n your case, for a small cluster of 1-3 machines, especially if you are just learning to play with spark, I'd start with an NFS mounted disk accessible on the same path on all machines. If you aren't willing to set that up, stick to spark standalone on a single machine first. You don't need a shared cluster to use spark standalone.

Personally, I'd recommend downloading apache zeppelin and running it locally as the simplest out-the-box experience.


It does mention that SPARK standalone cluster can have multiple machines running as slaves.


Clearly it omits the small detail about the requirement for a shared store.

The general idea of writing to the user group is that people who know should answer, and not those who do not know.

Agreed, but if the answer doesn't appear to be correct to you, do consider that there may be some detail that hasn't been mentioned, rather than immediately concluding that the person replying is wrong.

-Steve





Regards,
Gourav Sengupta

On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker <ma...@persistent.com>> wrote:
Gourav,
Riccardo’s answer is spot on.
What is happening is one node of spark is writing to its own directory and telling a slave to read the data from there, when the slave goes to read it, the part is not found.

Check the folder Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet on the slave.
The reason it ran on spark 1.5 may have been because the executor ran on the driver itself. There is not much use to a set up where you don’t have some kind of distributed file system, so I would encourage you to use hdfs, or a mounted file system shared by all nodes.

Regards,
Mahesh


From: Gourav Sengupta [mailto:gourav.sengupta@gmail.com<ma...@gmail.com>]
Sent: Monday, July 31, 2017 9:54 PM
To: Riccardo Ferrari
Cc: user
Subject: Re: SPARK Issue in Standalone cluster

Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK must be having a method so that different executors do not pick up the same files to process. You also did not answer the question why was the processing successful in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding the issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <fe...@gmail.com>> wrote:
Hi Gourav,

The issue here is the location where you're trying to write/read from :/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available to all executors (and driver), and that is reason why you generally use HDFS, S3, NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not tries to pick up the data from a selected node, it rather tries to write/read in parallel from the executor nodes. Also given its control logic there is no way (read. you should not care) to know what executor is doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <go...@gmail.com>> wrote:
Hi,

I am working by creating a native SPARK standalone cluster (https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK dataframe and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no permission issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate computers)

After that I can see in the spark UI (at port 8080) two workers.


CODE:
Then I run the following code:

======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master("spark://mastersystem.local:7077")
        .appName("gouravtest")
        .enableHiveSupport()
        .getOrCreate())
import pandas, numpy
testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4), columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test2").count()
======================================================


ERROR I (in above code):
ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
This line does not fail or report any error. But when I am looking at the stage in spark Application UI the error reported for one of the slave node which is not in the same system as the master node is mentioned below. The writing on the slave node which is in the same physical system as the Master happens correctly. (NOTE: slave node basically the worker and master node the driver)
----------------------------------------------------------------------------------------------------------------------------------

0 (TID 41). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010

17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030

17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes result sent to driver

17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65

17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)

17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3

17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 24.9 KB, free 365.9 MB)

17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms

17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 70.3 KB, free 365.9 MB)

17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)

java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet does not exist

          at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)

          at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

          at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)

          at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)

          at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)

          at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)

          at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)

          at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)

          at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)

          at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)

          at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)

          at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)

          at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)

          at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)

          at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)

          at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)

          at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)

          at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)

          at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

          at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)

          at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)

          at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)

          at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)

          at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)

          at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)

          at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)

          at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)

          at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)

          at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)

          at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66

----------------------------------------------------------------------------------------------------------------------------------


ERROR II  (in above code):
While trying to read the file there is now a distinct error thrown which mentions the same saying that the files do not exist.

Also why is SPARK trying to search for the same files in both the systems? If the same path in two systems have different files should SPARK not combine and work on them?



NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
I started spark using the same method but now using SPARK 1.5 and this does not give any error:
======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark

sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
sqlContext = pyspark.SQLContext(sc)
import pandas, numpy
testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4), columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test3")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test3").count()
======================================================

I will be sincerely obliged if someone could kindly help me out with this issue and point out my mistakes/ assumptions.




Regards,
Gourav Sengupta


DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.



Re: SPARK Issue in Standalone cluster

Posted by Frank Austin Nothaft <fn...@berkeley.edu>.
> The general idea of writing to the user group is that people who know should answer, and not those who do not know. 

I’d also add that if you’re going to write to the user group, you should be polite to people who try to answer your queries, even if you think they’re wrong.

This is especially true if the people you think are wrong are actually correct.

Frank Austin Nothaft
fnothaft@berkeley.edu
fnothaft@eecs.berkeley.edu
202-340-0466

> On Aug 2, 2017, at 6:25 AM, Gourav Sengupta <go...@gmail.com> wrote:
> 
> Hi,
> 
> I am definitely sure that at this point of time everyone who has kindly cared to respond to my query do need to go and check this link https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode <https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode>. 
> 
> It does mention that SPARK standalone cluster can have multiple machines running as slaves. 
> 
> The general idea of writing to the user group is that people who know should answer, and not those who do not know. 
> 
> 
> 
> Regards,
> Gourav Sengupta
> 
> On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker <mahesh_sawaiker@persistent.com <ma...@persistent.com>> wrote:
> Gourav,
> 
> Riccardo’s answer is spot on.
> 
> What is happening is one node of spark is writing to its own directory and telling a slave to read the data from there, when the slave goes to read it, the part is not found.
> 
>  
> 
> Check the folder Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet on the slave.
> 
> The reason it ran on spark 1.5 may have been because the executor ran on the driver itself. There is not much use to a set up where you don’t have some kind of distributed file system, so I would encourage you to use hdfs, or a mounted file system shared by all nodes.
> 
>  
> 
> Regards,
> 
> Mahesh
> 
>  
> 
>  
> 
> From: Gourav Sengupta [mailto:gourav.sengupta@gmail.com <ma...@gmail.com>] 
> Sent: Monday, July 31, 2017 9:54 PM
> To: Riccardo Ferrari
> Cc: user
> Subject: Re: SPARK Issue in Standalone cluster
> 
>  
> 
> Hi Riccardo,
> 
>  
> 
> I am grateful for your kind response.
> 
>  
> 
> Also I am sure that your answer is completely wrong and errorneous. SPARK must be having a method so that different executors do not pick up the same files to process. You also did not answer the question why was the processing successful in SPARK 1.5 and not in SPARK 2.2.
> 
>  
> 
> Also the exact same directory is is present across in both the nodes. 
> 
>  
> 
> I feel quite facinated when individuals respond before even understanding the issue, or trying out the code.
> 
>  
> 
> It will be of great help if someone could kindly read my email and help me figure out the issue.
> 
>  
> 
>  
> 
> Regards,
> 
> Gourav Sengupta
> 
>  
> 
>  
> 
>  
> 
> On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <ferrarir@gmail.com <ma...@gmail.com>> wrote:
> 
> Hi Gourav,
> 
>  
> 
> The issue here is the location where you're trying to write/read from :/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
> 
> When dealing with clusters all the paths and resources should be available to all executors (and driver), and that is reason why you generally use HDFS, S3, NFS or any shared file system.
> 
>  
> 
> Spark assumes your data is generally available to all nodes and does not tries to pick up the data from a selected node, it rather tries to write/read in parallel from the executor nodes. Also given its control logic there is no way (read. you should not care) to know what executor is doing what task.
> 
>  
> 
> Hope it helps,
> 
> Riccardo
> 
>  
> 
> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <gourav.sengupta@gmail.com <ma...@gmail.com>> wrote:
> 
> Hi,
> 
>  
> 
> I am working by creating a native SPARK standalone cluster (https://spark.apache.org/docs/2.2.0/spark-standalone.html <https://spark.apache.org/docs/2.2.0/spark-standalone.html>)
> 
>  
> 
> Therefore I  do not have a HDFS. 
> 
>  
> 
>  
> 
> EXERCISE: 
> 
> Its the most fundamental and simple exercise. Create a sample SPARK dataframe and then write it to a location and then read it back.
> 
>  
> 
> SETTINGS:
> 
> So after I have installed SPARK in two physical systems with the same:
> 
> 1. SPARK version, 
> 
> 2. JAVA version, 
> 
> 3. PYTHON_PATH
> 
> 4. SPARK_HOME
> 
> 5. PYSPARK_PYTHON 
> 
> the user in both the systems is the root user therefore there are no permission issues anywhere.
> 
>  
> 
> I am able to start:
> 
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
> 
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate computers)
> 
>  
> 
> After that I can see in the spark UI (at port 8080) two workers.
> 
>  
> 
>  
> 
> CODE:
> 
> Then I run the following code:
> 
>  
> 
> ======================================================
> 
> import findspark
> 
> import os
> 
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
> 
> findspark.init()
> 
> import pyspark
> 
> from pyspark.sql import SparkSession
> 
> spark = (SparkSession.builder
> 
>         .master("spark://mastersystem.local:7077")
> 
>         .appName("gouravtest")
> 
>         .enableHiveSupport()
> 
>         .getOrCreate())
> 
> import pandas, numpy
> 
> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4), columns=list('ABCD')))
> 
> testdf.cache()
> 
> testdf.count()
> 
> testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
> 
> spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test2").count()
> 
> ======================================================
> 
>  
> 
>  
> 
> ERROR I (in above code):
> 
> ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
> 
> This line does not fail or report any error. But when I am looking at the stage in spark Application UI the error reported for one of the slave node which is not in the same system as the master node is mentioned below. The writing on the slave node which is in the same physical system as the Master happens correctly. (NOTE: slave node basically the worker and master node the driver)
> 
> ----------------------------------------------------------------------------------------------------------------------------------
> 
> 0 (TID 41). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
> 17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
> 17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes result sent to driver
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
> 17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
> 17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 24.9 KB, free 365.9 MB)
> 17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 70.3 KB, free 365.9 MB)
> 17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
> java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet does not exist
>           at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>           at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>           at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
>           at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
>           at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>           at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
>           at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
>           at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
>           at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>           at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>           at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
>           at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
>           at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
>           at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
>           at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
>           at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>           at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>           at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>           at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>           at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66
>  
> 
> ----------------------------------------------------------------------------------------------------------------------------------
> 
>  
> 
>  
> 
> ERROR II  (in above code):
> 
> While trying to read the file there is now a distinct error thrown which mentions the same saying that the files do not exist.
> 
>  
> 
> Also why is SPARK trying to search for the same files in both the systems? If the same path in two systems have different files should SPARK not combine and work on them?
> 
>  
> 
>  
> 
>  
> 
> NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
> 
> I started spark using the same method but now using SPARK 1.5 and this does not give any error:
> 
> ======================================================
> 
> import findspark
> 
> import os
> 
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
> 
> findspark.init()
> 
> import pyspark
> 
>  
> 
> sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
> 
> sqlContext = pyspark.SQLContext(sc)
> 
> import pandas, numpy
> 
> testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4), columns=list('ABCD')))
> 
> testdf.cache()
> 
> testdf.count()
> 
> testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test3")
> 
> spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test3").count()
> 
> ======================================================
> 
>  
> 
> I will be sincerely obliged if someone could kindly help me out with this issue and point out my mistakes/ assumptions.
> 
>  
> 
>  
> 
>  
> 
>  
> 
> Regards,
> 
> Gourav Sengupta
> 
>  
> 
>  
> 
> DISCLAIMER
> ==========
> This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this  message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.
> 


Re: SPARK Issue in Standalone cluster

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

I am definitely sure that at this point of time everyone who has kindly
cared to respond to my query do need to go and check this link
https://spark.apache.org/docs/2.2.0/spark-standalone.html#spark-standalone-mode
.

It does mention that SPARK standalone cluster can have multiple machines
running as slaves.

The general idea of writing to the user group is that people who know
should answer, and not those who do not know.



Regards,
Gourav Sengupta

On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker <
mahesh_sawaiker@persistent.com> wrote:

> Gourav,
>
> Riccardo’s answer is spot on.
>
> What is happening is one node of spark is writing to its own directory and
> telling a slave to read the data from there, when the slave goes to read
> it, the part is not found.
>
>
>
> Check the folder Users/gouravsengupta/Development/spark/sparkdata/
> test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet on
> the slave.
>
> The reason it ran on spark 1.5 may have been because the executor ran on
> the driver itself. There is not much use to a set up where you don’t have
> some kind of distributed file system, so I would encourage you to use hdfs,
> or a mounted file system shared by all nodes.
>
>
>
> Regards,
>
> Mahesh
>
>
>
>
>
> *From:* Gourav Sengupta [mailto:gourav.sengupta@gmail.com]
> *Sent:* Monday, July 31, 2017 9:54 PM
> *To:* Riccardo Ferrari
> *Cc:* user
> *Subject:* Re: SPARK Issue in Standalone cluster
>
>
>
> Hi Riccardo,
>
>
>
> I am grateful for your kind response.
>
>
>
> Also I am sure that your answer is completely wrong and errorneous. SPARK
> must be having a method so that different executors do not pick up the same
> files to process. You also did not answer the question why was the
> processing successful in SPARK 1.5 and not in SPARK 2.2.
>
>
>
> Also the exact same directory is is present across in both the nodes.
>
>
>
> I feel quite facinated when individuals respond before even understanding
> the issue, or trying out the code.
>
>
>
> It will be of great help if someone could kindly read my email and help me
> figure out the issue.
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
>
>
>
> On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <fe...@gmail.com>
> wrote:
>
> Hi Gourav,
>
>
>
> The issue here is the location where you're trying to write/read from :
> /Users/gouravsengupta/Development/spark/sparkdata/test1/p...
>
> When dealing with clusters all the paths and resources should be available
> to all executors (and driver), and that is reason why you generally use
> HDFS, S3, NFS or any shared file system.
>
>
>
> Spark assumes your data is generally available to all nodes and does not
> tries to pick up the data from a selected node, it rather tries to
> write/read in parallel from the executor nodes. Also given its control
> logic there is no way (read. you should not care) to know what executor is
> doing what task.
>
>
>
> Hope it helps,
>
> Riccardo
>
>
>
> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
> Hi,
>
>
>
> I am working by creating a native SPARK standalone cluster (
> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>
>
>
> Therefore I  do not have a HDFS.
>
>
>
>
>
> EXERCISE:
>
> Its the most fundamental and simple exercise. Create a sample SPARK
> dataframe and then write it to a location and then read it back.
>
>
>
> SETTINGS:
>
> So after I have installed SPARK in two physical systems with the same:
>
> 1. SPARK version,
>
> 2. JAVA version,
>
> 3. PYTHON_PATH
>
> 4. SPARK_HOME
>
> 5. PYSPARK_PYTHON
>
> the user in both the systems is the root user therefore there are no
> permission issues anywhere.
>
>
>
> I am able to start:
>
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
>
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
> computers)
>
>
>
> After that I can see in the spark UI (at port 8080) two workers.
>
>
>
>
>
> CODE:
>
> Then I run the following code:
>
>
>
> ======================================================
>
> import findspark
>
> import os
>
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
>
> findspark.init()
>
> import pyspark
>
> from pyspark.sql import SparkSession
>
> spark = (SparkSession.builder
>
>         .master("spark://mastersystem.local:7077")
>
>         .appName("gouravtest")
>
>         .enableHiveSupport()
>
>         .getOrCreate())
>
> import pandas, numpy
>
> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
> 4), columns=list('ABCD')))
>
> testdf.cache()
>
> testdf.count()
>
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
>
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test2").count()
>
> ======================================================
>
>
>
>
>
> ERROR I (in above code):
>
> ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
>
> This line does not fail or report any error. But when I am looking at the
> stage in spark Application UI the error reported for one of the slave node
> which is not in the same system as the master node is mentioned below. The
> writing on the slave node which is in the same physical system as the
> Master happens correctly. (NOTE: slave node basically the worker and master
> node the driver)
>
> ------------------------------------------------------------
> ----------------------------------------------------------------------
>
> 0 (TID 41). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0: Committed
>
> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0: Committed
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0: Committed
>
> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0: Committed
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0: Committed
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0: Committed
>
> 17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
>
> 17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0: Committed
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
>
> 17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0: Committed
>
> 17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0: Committed
>
> 17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0: Committed
>
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
>
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0: Committed
>
> 17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes result sent to driver
>
> 17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes result sent to driver
>
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
>
> 17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
>
> 17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
>
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 24.9 KB, free 365.9 MB)
>
> 17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
>
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 70.3 KB, free 365.9 MB)
>
> 17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
>
> java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet does not exist
>
>           at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>
>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>
>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>
>           at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>
>           at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
>
>           at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
>
>           at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>
>           at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
>
>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
>
>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
>
>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
>
>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
>
>           at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
>
>           at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
>
>           at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
>
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>
>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>
>           at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
>
>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>
>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>
>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>
>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>           at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
>
>           at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
>
>           at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
>
>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
>
>           at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
>
>           at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
>
>           at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
>
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>
>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>
>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>
>           at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
>
>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>
>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>
>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>
>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>           at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>           at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>           at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66
>
>
>
> ------------------------------------------------------------
> ----------------------------------------------------------------------
>
>
>
>
>
> ERROR II  (in above code):
>
> While trying to read the file there is now a distinct error thrown which
> mentions the same saying that the files do not exist.
>
>
>
> Also why is SPARK trying to search for the same files in both the systems?
> If the same path in two systems have different files should SPARK not
> combine and work on them?
>
>
>
>
>
>
>
> NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
>
> I started spark using the same method but now using SPARK 1.5 and this
> does not give any error:
>
> ======================================================
>
> import findspark
>
> import os
>
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
>
> findspark.init()
>
> import pyspark
>
>
>
> sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
>
> sqlContext = pyspark.SQLContext(sc)
>
> import pandas, numpy
>
> testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
> 4), columns=list('ABCD')))
>
> testdf.cache()
>
> testdf.count()
>
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test3")
>
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test3").count()
>
> ======================================================
>
>
>
> I will be sincerely obliged if someone could kindly help me out with this
> issue and point out my mistakes/ assumptions.
>
>
>
>
>
>
>
>
>
> Regards,
>
> Gourav Sengupta
>
>
>
>
> DISCLAIMER
> ==========
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>

RE: SPARK Issue in Standalone cluster

Posted by Mahesh Sawaiker <ma...@persistent.com>.
Gourav,
Riccardo’s answer is spot on.
What is happening is one node of spark is writing to its own directory and telling a slave to read the data from there, when the slave goes to read it, the part is not found.

Check the folder Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet on the slave.
The reason it ran on spark 1.5 may have been because the executor ran on the driver itself. There is not much use to a set up where you don’t have some kind of distributed file system, so I would encourage you to use hdfs, or a mounted file system shared by all nodes.

Regards,
Mahesh


From: Gourav Sengupta [mailto:gourav.sengupta@gmail.com]
Sent: Monday, July 31, 2017 9:54 PM
To: Riccardo Ferrari
Cc: user
Subject: Re: SPARK Issue in Standalone cluster

Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK must be having a method so that different executors do not pick up the same files to process. You also did not answer the question why was the processing successful in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding the issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <fe...@gmail.com>> wrote:
Hi Gourav,

The issue here is the location where you're trying to write/read from :/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available to all executors (and driver), and that is reason why you generally use HDFS, S3, NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not tries to pick up the data from a selected node, it rather tries to write/read in parallel from the executor nodes. Also given its control logic there is no way (read. you should not care) to know what executor is doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <go...@gmail.com>> wrote:
Hi,

I am working by creating a native SPARK standalone cluster (https://spark.apache.org/docs/2.2.0/spark-standalone.html)

Therefore I  do not have a HDFS.


EXERCISE:
Its the most fundamental and simple exercise. Create a sample SPARK dataframe and then write it to a location and then read it back.

SETTINGS:
So after I have installed SPARK in two physical systems with the same:
1. SPARK version,
2. JAVA version,
3. PYTHON_PATH
4. SPARK_HOME
5. PYSPARK_PYTHON
the user in both the systems is the root user therefore there are no permission issues anywhere.

I am able to start:
1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate computers)

After that I can see in the spark UI (at port 8080) two workers.


CODE:
Then I run the following code:

======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master("spark://mastersystem.local:7077")
        .appName("gouravtest")
        .enableHiveSupport()
        .getOrCreate())
import pandas, numpy
testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4), columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test2").count()
======================================================


ERROR I (in above code):
ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test2")
This line does not fail or report any error. But when I am looking at the stage in spark Application UI the error reported for one of the slave node which is not in the same system as the master node is mentioned below. The writing on the slave node which is in the same physical system as the Master happens correctly. (NOTE: slave node basically the worker and master node the driver)
----------------------------------------------------------------------------------------------------------------------------------

0 (TID 41). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010

17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030

17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0: Committed

17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023

17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0: Committed

17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes result sent to driver

17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes result sent to driver

17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65

17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)

17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3

17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 24.9 KB, free 365.9 MB)

17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms

17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 70.3 KB, free 365.9 MB)

17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)

java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet does not exist

          at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)

          at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)

          at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)

          at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)

          at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)

          at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)

          at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)

          at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)

          at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)

          at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)

          at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)

          at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)

          at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)

          at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)

          at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)

          at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)

          at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)

          at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)

          at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

          at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)

          at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)

          at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)

          at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)

          at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)

          at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)

          at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)

          at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)

          at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)

          at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)

          at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)

          at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66

----------------------------------------------------------------------------------------------------------------------------------


ERROR II  (in above code):
While trying to read the file there is now a distinct error thrown which mentions the same saying that the files do not exist.

Also why is SPARK trying to search for the same files in both the systems? If the same path in two systems have different files should SPARK not combine and work on them?



NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
I started spark using the same method but now using SPARK 1.5 and this does not give any error:
======================================================
import findspark
import os
os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Development/spark/spark/'
findspark.init()
import pyspark

sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
sqlContext = pyspark.SQLContext(sc)
import pandas, numpy
testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000, 4), columns=list('ABCD')))
testdf.cache()
testdf.count()
testdf.write.save("/Users/gouravsengupta/Development/spark/sparkdata/test3")
spark.read.load("/Users/gouravsengupta/Development/spark/sparkdata/test3").count()
======================================================

I will be sincerely obliged if someone could kindly help me out with this issue and point out my mistakes/ assumptions.




Regards,
Gourav Sengupta


DISCLAIMER
==========
This e-mail may contain privileged and confidential information which is the property of Persistent Systems Ltd. It is intended only for the use of the individual or entity to which it is addressed. If you are not the intended recipient, you are not authorized to read, retain, copy, print, distribute or use this message. If you have received this communication in error, please notify the sender and delete all copies of this message. Persistent Systems Ltd. does not accept any liability for virus infected mails.

Re: SPARK Issue in Standalone cluster

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Riccardo,

I am grateful for your kind response.

Also I am sure that your answer is completely wrong and errorneous. SPARK
must be having a method so that different executors do not pick up the same
files to process. You also did not answer the question why was the
processing successful in SPARK 1.5 and not in SPARK 2.2.

Also the exact same directory is is present across in both the nodes.

I feel quite facinated when individuals respond before even understanding
the issue, or trying out the code.

It will be of great help if someone could kindly read my email and help me
figure out the issue.


Regards,
Gourav Sengupta



On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <fe...@gmail.com>
wrote:

> Hi Gourav,
>
> The issue here is the location where you're trying to write/read from :
> /Users/gouravsengupta/Development/spark/sparkdata/test1/p...
> When dealing with clusters all the paths and resources should be available
> to all executors (and driver), and that is reason why you generally use
> HDFS, S3, NFS or any shared file system.
>
> Spark assumes your data is generally available to all nodes and does not
> tries to pick up the data from a selected node, it rather tries to
> write/read in parallel from the executor nodes. Also given its control
> logic there is no way (read. you should not care) to know what executor is
> doing what task.
>
> Hope it helps,
> Riccardo
>
> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi,
>>
>> I am working by creating a native SPARK standalone cluster (
>> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>>
>> Therefore I  do not have a HDFS.
>>
>>
>> EXERCISE:
>> Its the most fundamental and simple exercise. Create a sample SPARK
>> dataframe and then write it to a location and then read it back.
>>
>> SETTINGS:
>> So after I have installed SPARK in two physical systems with the same:
>> 1. SPARK version,
>> 2. JAVA version,
>> 3. PYTHON_PATH
>> 4. SPARK_HOME
>> 5. PYSPARK_PYTHON
>> the user in both the systems is the root user therefore there are no
>> permission issues anywhere.
>>
>> I am able to start:
>> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
>> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
>> computers)
>>
>> After that I can see in the spark UI (at port 8080) two workers.
>>
>>
>> CODE:
>> Then I run the following code:
>>
>> ======================================================
>> import findspark
>> import os
>> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Develop
>> ment/spark/spark/'
>> findspark.init()
>> import pyspark
>> from pyspark.sql import SparkSession
>> spark = (SparkSession.builder
>>         .master("spark://mastersystem.local:7077")
>>         .appName("gouravtest")
>>         .enableHiveSupport()
>>         .getOrCreate())
>> import pandas, numpy
>> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
>> 4), columns=list('ABCD')))
>> testdf.cache()
>> testdf.count()
>> testdf.write.save("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2")
>> spark.read.load("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2").count()
>> ======================================================
>>
>>
>> ERROR I (in above code):
>> ERROR in line: testdf.write.save("/Users/gour
>> avsengupta/Development/spark/sparkdata/test2")
>> This line does not fail or report any error. But when I am looking at the
>> stage in spark Application UI the error reported for one of the slave node
>> which is not in the same system as the master node is mentioned below. The
>> writing on the slave node which is in the same physical system as the
>> Master happens correctly. (NOTE: slave node basically the worker and master
>> node the driver)
>> ------------------------------------------------------------
>> ----------------------------------------------------------------------
>>
>> 0 (TID 41). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0: Committed
>> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0: Committed
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0: Committed
>> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0: Committed
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0: Committed
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0: Committed
>> 17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
>> 17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0: Committed
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
>> 17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0: Committed
>> 17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0: Committed
>> 17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0: Committed
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0: Committed
>> 17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes result sent to driver
>> 17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes result sent to driver
>> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
>> 17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
>> 17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
>> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 24.9 KB, free 365.9 MB)
>> 17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
>> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 70.3 KB, free 365.9 MB)
>> 17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
>> java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet does not exist
>> 	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>> 	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>> 	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
>> 	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
>> 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>> 	at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
>> 	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
>> 	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
>> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
>> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
>> 	at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
>> 	at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
>> 	at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
>> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>> 	at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
>> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>> 	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
>> 	at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
>> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
>> 	at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
>> 	at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
>> 	at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
>> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>> 	at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
>> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>> 	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66
>>
>>
>> ------------------------------------------------------------
>> ----------------------------------------------------------------------
>>
>>
>> ERROR II  (in above code):
>> While trying to read the file there is now a distinct error thrown which
>> mentions the same saying that the files do not exist.
>>
>> Also why is SPARK trying to search for the same files in both the
>> systems? If the same path in two systems have different files should SPARK
>> not combine and work on them?
>>
>>
>>
>> NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
>> I started spark using the same method but now using SPARK 1.5 and this
>> does not give any error:
>> ======================================================
>> import findspark
>> import os
>> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Develop
>> ment/spark/spark/'
>> findspark.init()
>> import pyspark
>>
>> sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
>> sqlContext = pyspark.SQLContext(sc)
>> import pandas, numpy
>> testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
>> 4), columns=list('ABCD')))
>> testdf.cache()
>> testdf.count()
>> testdf.write.save("/Users/gouravsengupta/Development/spark/
>> sparkdata/test3")
>> spark.read.load("/Users/gouravsengupta/Development/spark/
>> sparkdata/test3").count()
>> ======================================================
>>
>> I will be sincerely obliged if someone could kindly help me out with this
>> issue and point out my mistakes/ assumptions.
>>
>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>
>

Re: SPARK Issue in Standalone cluster

Posted by Riccardo Ferrari <fe...@gmail.com>.
Hi Gourav,

The issue here is the location where you're trying to write/read from :
/Users/gouravsengupta/Development/spark/sparkdata/test1/p...
When dealing with clusters all the paths and resources should be available
to all executors (and driver), and that is reason why you generally use
HDFS, S3, NFS or any shared file system.

Spark assumes your data is generally available to all nodes and does not
tries to pick up the data from a selected node, it rather tries to
write/read in parallel from the executor nodes. Also given its control
logic there is no way (read. you should not care) to know what executor is
doing what task.

Hope it helps,
Riccardo

On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> I am working by creating a native SPARK standalone cluster (
> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>
> Therefore I  do not have a HDFS.
>
>
> EXERCISE:
> Its the most fundamental and simple exercise. Create a sample SPARK
> dataframe and then write it to a location and then read it back.
>
> SETTINGS:
> So after I have installed SPARK in two physical systems with the same:
> 1. SPARK version,
> 2. JAVA version,
> 3. PYTHON_PATH
> 4. SPARK_HOME
> 5. PYSPARK_PYTHON
> the user in both the systems is the root user therefore there are no
> permission issues anywhere.
>
> I am able to start:
> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
> computers)
>
> After that I can see in the spark UI (at port 8080) two workers.
>
>
> CODE:
> Then I run the following code:
>
> ======================================================
> import findspark
> import os
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
> findspark.init()
> import pyspark
> from pyspark.sql import SparkSession
> spark = (SparkSession.builder
>         .master("spark://mastersystem.local:7077")
>         .appName("gouravtest")
>         .enableHiveSupport()
>         .getOrCreate())
> import pandas, numpy
> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
> 4), columns=list('ABCD')))
> testdf.cache()
> testdf.count()
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test2").count()
> ======================================================
>
>
> ERROR I (in above code):
> ERROR in line: testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test2")
> This line does not fail or report any error. But when I am looking at the
> stage in spark Application UI the error reported for one of the slave node
> which is not in the same system as the master node is mentioned below. The
> writing on the slave node which is in the same physical system as the
> Master happens correctly. (NOTE: slave node basically the worker and master
> node the driver)
> ------------------------------------------------------------
> ----------------------------------------------------------------------
>
> 0 (TID 41). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
> 17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
> 17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0: Committed
> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0' to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0: Committed
> 17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes result sent to driver
> 17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes result sent to driver
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
> 17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
> 17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 24.9 KB, free 365.9 MB)
> 17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 70.3 KB, free 365.9 MB)
> 17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
> java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet does not exist
> 	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
> 	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
> 	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
> 	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
> 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
> 	at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
> 	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
> 	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
> 	at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
> 	at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
> 	at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 	at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
> 	at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
> 	at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
> 	at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
> 	at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
> 	at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 	at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66
>
>
> ------------------------------------------------------------
> ----------------------------------------------------------------------
>
>
> ERROR II  (in above code):
> While trying to read the file there is now a distinct error thrown which
> mentions the same saying that the files do not exist.
>
> Also why is SPARK trying to search for the same files in both the systems?
> If the same path in two systems have different files should SPARK not
> combine and work on them?
>
>
>
> NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
> I started spark using the same method but now using SPARK 1.5 and this
> does not give any error:
> ======================================================
> import findspark
> import os
> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/
> Development/spark/spark/'
> findspark.init()
> import pyspark
>
> sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
> sqlContext = pyspark.SQLContext(sc)
> import pandas, numpy
> testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
> 4), columns=list('ABCD')))
> testdf.cache()
> testdf.count()
> testdf.write.save("/Users/gouravsengupta/Development/
> spark/sparkdata/test3")
> spark.read.load("/Users/gouravsengupta/Development/
> spark/sparkdata/test3").count()
> ======================================================
>
> I will be sincerely obliged if someone could kindly help me out with this
> issue and point out my mistakes/ assumptions.
>
>
>
>
> Regards,
> Gourav Sengupta
>