You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Oleg Ruchovets <or...@gmail.com> on 2014/09/09 18:56:03 UTC

PySpark on Yarn - how group by data properly

Hi ,

   I came from map/reduce background and try to do quite trivial thing:

I have a lot of files ( on hdfs ) - format is :

   1 , 2 , 3
   2 , 3 , 5
   1 , 3,  5
    2, 3 , 4
    2 , 5, 1

  I am actually need to group by key (first column) :
  key   values
  1 --> (2,3),(3,5)
  2 --> (3,5),(3,4),(5,1)

  and I need to process (pass)  values to the function f ( my custom
function)
  outcome of  function f()  should be  to hdfs with corresponding key:
    1 --> f() outcome
    2 --> f() outcome.

My code is :

      def doSplit(x):
        y = x.split(',')
        if(len(y)==3):
           return  y[0],(y[1],y[2])


    lines = sc.textFile(filename,1)
    counts = lines.map(doSplit).groupByKey()
    output = counts.collect()

    for (key, value) in output:
        print 'build model for key ->' , key
        print value
        f(str(key) , value))


Questions:
   1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
groupByKey( f() ) to process grouped values? how can I process grouped keys
by custom function? function f has some not trivial logic.

    2) Using output ( I really don't like this approach )  to pass to
function looks like not scalable and executed only on one machine?  What is
the way using PySpark process grouped keys in distributed fashion.
Multiprocessing and on different machine of the cluster.

3)In case of  processing output how data can be stored on hdfs?

Thanks
Oleg.

Re: PySpark on Yarn - how group by data properly

Posted by Oleg Ruchovets <or...@gmail.com>.
I am expand my data set and executing pyspark on yarn:
   I payed attention that only 2 processes processed the data:

14210 yarn      20   0 2463m 2.0g 9708 R 100.0  4.3   8:22.63 python2.7
32467 yarn      20   0 2519m 2.1g 9720 R 99.3  4.4   7:16.97   python2.7


*Question:*
   *how to configure pyspark to have more processes for  process the data?*


Here is my command :

      [hdfs@UCS-MASTER cad]$
/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
--master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
--py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
/input/tad/data.csv /output/cad_model_500_1

I tried to play with num-executors and executor-cores but it is still 2
python processes doing the job. I have 5 machine cluster with 32 GB ram.

console output:


14/09/16 20:07:34 INFO spark.SecurityManager: Changing view acls to: hdfs
14/09/16 20:07:34 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hdfs)
14/09/16 20:07:34 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/16 20:07:35 INFO Remoting: Starting remoting
14/09/16 20:07:35 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@UCS-MASTER.sms1.local:39379]
14/09/16 20:07:35 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@UCS-MASTER.sms1.local:39379]
14/09/16 20:07:35 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/16 20:07:35 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/16 20:07:35 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140916200735-53f6
14/09/16 20:07:35 INFO storage.MemoryStore: MemoryStore started with
capacity 2.3 GB.
14/09/16 20:07:35 INFO network.ConnectionManager: Bound socket to port
37255 with id = ConnectionManagerId(UCS-MASTER.sms1.local,37255)
14/09/16 20:07:35 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/09/16 20:07:35 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:37255 with 2.3 GB RAM
14/09/16 20:07:35 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:55286
14/09/16 20:07:35 INFO broadcast.HttpBroadcast: Broadcast server started at
http://10.193.218.2:55286
14/09/16 20:07:35 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-ca8193be-9148-4e7e-a0cc-4b6e7cb72172
14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:38065
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
14/09/16 20:07:35 INFO ui.SparkUI: Started SparkUI at
http://UCS-MASTER.sms1.local:4040
14/09/16 20:07:36 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
--args is deprecated. Use --arg instead.
14/09/16 20:07:36 INFO impl.TimelineClientImpl: Timeline service address:
http://UCS-NODE1.sms1.local:8188/ws/v1/timeline/
14/09/16 20:07:37 INFO client.RMProxy: Connecting to ResourceManager at
UCS-NODE1.sms1.local/10.193.218.3:8050
14/09/16 20:07:37 INFO yarn.Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 5
14/09/16 20:07:37 INFO yarn.Client: Queue info ... queueName: default,
queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
      queueApplicationCount = 0, queueChildQueueCount = 0
14/09/16 20:07:37 INFO yarn.Client: Max mem capabililty of a single
resource in this cluster 53248
14/09/16 20:07:37 INFO yarn.Client: Preparing Local resources
14/09/16 20:07:37 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
14/09/16 20:07:37 INFO yarn.Client: Uploading
file:/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
14/09/16 20:07:39 INFO yarn.Client: Uploading
file:/usr/lib/cad/PrepareDataSetYarn.py to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/PrepareDataSetYarn.py
14/09/16 20:07:39 INFO yarn.Client: Uploading file:/usr/lib/cad/tad.zip to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/tad.zip
14/09/16 20:07:39 INFO yarn.Client: Setting up the launch environment
14/09/16 20:07:39 INFO yarn.Client: Setting up container launch context
14/09/16 20:07:39 INFO yarn.Client: Command for starting the Spark
ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m,
-Djava.io.tmpdir=$PWD/tmp,
-Dspark.tachyonStore.folderName=\"spark-7543138b-c763-4a6b-abff-81b8dd1bd639\",
-Dspark.executor.memory=\"2g\", -Dspark.executor.instances=\"12\",
-Dspark.yarn.dist.files=\"file:/usr/lib/cad/PrepareDataSetYarn.py,file:/usr/lib/cad/tad.zip\",
-Dspark.yarn.secondary.jars=\"\",
-Dspark.submit.pyFiles=\"/usr/lib/cad/tad.zip\",
-Dspark.driver.host=\"UCS-MASTER.sms1.local\", -Dspark.app.name=\"CAD\",
-Dspark.fileserver.uri=\"http://10.193.218.2:38065\",
-Dspark.master=\"yarn-client\", -Dspark.driver.port=\"39379\",
-Dspark.executor.cores=\"4\", -Dspark.httpBroadcast.uri=\"
http://10.193.218.2:55286\",
 -Dlog4j.configuration=log4j-spark-container.properties,
org.apache.spark.deploy.yarn.ExecutorLauncher, --class, notused, --jar ,
null,  --args  'UCS-MASTER.sms1.local:39379' , --executor-memory, 2048,
--executor-cores, 4, --num-executors , 12, 1>, <LOG_DIR>/stdout, 2>,
<LOG_DIR>/stderr)
14/09/16 20:07:39 INFO yarn.Client: Submitting application to ASM
14/09/16 20:07:39 INFO impl.YarnClientImpl: Submitted application
application_1409564765875_0046
14/09/16 20:07:39 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:40 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:41 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:42 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:43 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: 0
 appStartTime: 1410869259760
 yarnAppState: RUNNING

14/09/16 20:07:45 INFO cluster.YarnClientClusterScheduler:
YarnClientClusterScheduler.postStartHook done
14/09/16 20:07:45 INFO storage.MemoryStore: ensureFreeSpace(85685) called
with curMem=0, maxMem=2470025625
14/09/16 20:07:45 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 83.7 KB, free 2.3 GB)
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.tip.id is
deprecated. Instead, use mapreduce.task.id
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.id is
deprecated. Instead, use mapreduce.task.attempt.id
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.is.map is
deprecated. Instead, use mapreduce.task.ismap
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.partition is
deprecated. Instead, use mapreduce.task.partition
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.job.id is
deprecated. Instead, use mapreduce.job.id
14/09/16 20:07:46 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:38745/user/Executor#-700940149]
with ID 2
14/09/16 20:07:46 INFO spark.SparkContext: Starting job: saveAsTextFile at
NativeMethodAccessorImpl.java:-2
14/09/16 20:07:46 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE1.sms1.local:39103 with 1178.1 MB RAM
14/09/16 20:07:46 INFO mapred.FileInputFormat: Total input paths to process
: 1
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Registering RDD 3 (RDD at
PythonRDD.scala:252)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
NativeMethodAccessorImpl.java:-2) with 2 output partitions
(allowLocal=false)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Final stage: Stage
0(saveAsTextFile at NativeMethodAccessorImpl.java:-2)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Parents of final stage:
List(Stage 1)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Missing parents: List(Stage
1)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Submitting Stage 1
(PairwiseRDD[3] at RDD at PythonRDD.scala:252), which has no missing parents
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Submitting 3 missing tasks
from Stage 1 (PairwiseRDD[3] at RDD at PythonRDD.scala:252)
14/09/16 20:07:46 INFO cluster.YarnClientClusterScheduler: Adding task set
1.0 with 3 tasks
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
0 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
3895 bytes in 2 ms
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID
1 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as
3895 bytes in 0 ms
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:2 as TID
2 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:2 as
3895 bytes in 0 ms
14/09/16 20:07:46 INFO util.RackResolver: Resolved UCS-NODE1.sms1.local to
/default-rack
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE2.sms1.local:40219/user/Executor#36134446]
with ID 3
14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE2.sms1.local to
/default-rack
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE2.sms1.local:37085/user/Executor#-199925854]
with ID 8
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:50805/user/Executor#-1155334234]
with ID 6
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE2.sms1.local:56453 with 1178.1 MB RAM
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE3.sms1.local:41585/user/Executor#2145905321]
with ID 5
14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE3.sms1.local to
/default-rack
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE2.sms1.local:37911 with 1178.1 MB RAM
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE1.sms1.local:50858 with 1178.1 MB RAM
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE3.sms1.local:59208/user/Executor#-1289679344]
with ID 10
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE3.sms1.local:40785 with 1178.1 MB RAM
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:59378/user/Executor#-1053652401]
with ID 4
14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE4.sms1.local to
/default-rack
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:57503/user/Executor#-1979237887]
with ID 9
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE3.sms1.local:37476 with 1178.1 MB RAM
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE4.sms1.local:52439 with 1178.1 MB RAM
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE4.sms1.local:36292 with 1178.1 MB RAM
14/09/16 20:07:48 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:41792/user/Executor#-69338597]
with ID 12
14/09/16 20:07:48 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE1.sms1.local:59539 with 1178.1 MB RAM
14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:60623/user/Executor#361407816]
with ID 11
14/09/16 20:07:49 INFO util.RackResolver: Resolved UCS-MASTER.sms1.local to
/default-rack
14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:44955/user/Executor#389344699]
with ID 1
14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:54092/user/Executor#-1381396123]
with ID 7
14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:52854 with 1178.1 MB RAM
14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:50766 with 1178.1 MB RAM
14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:55993 with 1178.1 MB RAM
14/09/16 20:08:58 INFO scheduler.TaskSetManager: Finished TID 2 in 71856 ms
on UCS-NODE1.sms1.local (progress: 1/3)
14/09/16 20:08:58 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
2)
14/09/16 20:11:37 INFO cluster.YarnClientSchedulerBackend: Executor 2
disconnected, so removing it
14/09/16 20:11:37 ERROR cluster.YarnClientClusterScheduler: Lost executor 2
on UCS-NODE1.sms1.local: remote Akka client disassociated
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Re-queueing tasks for 2
from TaskSet 1.0
14/09/16 20:11:37 INFO scheduler.DAGScheduler: Resubmitted
ShuffleMapTask(1, 2), so marking it as still running
14/09/16 20:11:37 WARN scheduler.TaskSetManager: Lost TID 1 (task 1.0:1)
14/09/16 20:11:37 WARN scheduler.TaskSetManager: Lost TID 0 (task 1.0:0)
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID
3 on executor 9: UCS-NODE4.sms1.local (NODE_LOCAL)
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as
3895 bytes in 0 ms
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
4 on executor 5: UCS-NODE3.sms1.local (NODE_LOCAL)
14/09/16 20:11:37 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 0)
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
3895 bytes in 0 ms
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:2 as TID
5 on executor 1: UCS-MASTER.sms1.local (NODE_LOCAL)
14/09/16 20:11:37 INFO storage.BlockManagerMasterActor: Trying to remove
executor 2 from BlockManagerMaster.
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:2 as
3895 bytes in 0 ms
14/09/16 20:11:37 INFO storage.BlockManagerMaster: Removed 2 successfully
in removeExecutor
14/09/16 20:11:37 INFO scheduler.Stage: Stage 1 is now unavailable on
executor 2 (0/3, false)
14/09/16 20:11:53 INFO cluster.YarnClientSchedulerBackend: Registered
executor: Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:47948/user/Executor#-1547490738]
with ID 13
14/09/16 20:11:53 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE4.sms1.local:51174 with 1178.1 MB RAM
14/09/16 20:12:19 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
2)
14/09/16 20:12:19 INFO scheduler.TaskSetManager: Finished TID 5 in 41426 ms
on UCS-MASTER.sms1.local (progress: 1/3)
14/09/16 20:14:23 INFO scheduler.TaskSetManager: Finished TID 3 in 165752
ms on UCS-NODE4.sms1.local (progress: 2/3)
14/09/16 20:14:23 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
1)
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
0)
14/09/16 20:14:27 INFO scheduler.TaskSetManager: Finished TID 4 in 170168
ms on UCS-NODE3.sms1.local (progress: 3/3)
14/09/16 20:14:27 INFO cluster.YarnClientClusterScheduler: Removed TaskSet
1.0, whose tasks have all completed, from pool
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Stage 1 (RDD at
PythonRDD.scala:252) finished in 401.305 s
14/09/16 20:14:27 INFO scheduler.DAGScheduler: looking for newly runnable
stages
14/09/16 20:14:27 INFO scheduler.DAGScheduler: running: Set()
14/09/16 20:14:27 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
14/09/16 20:14:27 INFO scheduler.DAGScheduler: failed: Set()
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Missing parents for Stage 0:
List()
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2), which
is now runnable
14/09/16 20:14:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 0 (MappedRDD[8] at saveAsTextFile at
NativeMethodAccessorImpl.java:-2)
14/09/16 20:14:28 INFO cluster.YarnClientClusterScheduler: Adding task set
0.0 with 2 tasks
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
6 on executor 8: UCS-NODE2.sms1.local (PROCESS_LOCAL)
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
17714 bytes in 0 ms
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
7 on executor 6: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
17714 bytes in 1 ms
14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@UCS-NODE1.sms1.local:54238
14/09/16 20:14:28 INFO spark.MapOutputTrackerMaster: Size of output
statuses for shuffle 0 is 184 bytes
14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@UCS-NODE2.sms1.local:43725

Thanks
Oleg.

On Wed, Sep 10, 2014 at 1:48 AM, Davies Liu <da...@databricks.com> wrote:

> On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets <or...@gmail.com>
> wrote:
> > Hi ,
> >
> >    I came from map/reduce background and try to do quite trivial thing:
> >
> > I have a lot of files ( on hdfs ) - format is :
> >
> >    1 , 2 , 3
> >    2 , 3 , 5
> >    1 , 3,  5
> >     2, 3 , 4
> >     2 , 5, 1
> >
> >   I am actually need to group by key (first column) :
> >   key   values
> >   1 --> (2,3),(3,5)
> >   2 --> (3,5),(3,4),(5,1)
> >
> >   and I need to process (pass)  values to the function f ( my custom
> > function)
> >   outcome of  function f()  should be  to hdfs with corresponding key:
> >     1 --> f() outcome
> >     2 --> f() outcome.
> >
> > My code is :
> >
> >       def doSplit(x):
> >         y = x.split(',')
> >         if(len(y)==3):
> >            return  y[0],(y[1],y[2])
> >
> >
> >     lines = sc.textFile(filename,1)
> >     counts = lines.map(doSplit).groupByKey()
> >     output = counts.collect()
> >
> >     for (key, value) in output:
> >         print 'build model for key ->' , key
> >         print value
> >         f(str(key) , value))
> >
> >
> > Questions:
> >    1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
> > groupByKey( f() ) to process grouped values? how can I process grouped
> keys
> > by custom function? function f has some not trivial logic.
>
> The result of groupByKey() is still RDD with (key, ResultIterable(values)),
> so you can continue to call map() or mapValues() on it:
>
> lines.map(doSplit).groupByKey().map(f)
>
> But your `f` need two parameters, the map() will assume that `f`
> take one parameter, so you need to build a wrapper for `f`:
>
> lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs))
>
> If the `f` only accept values as list, then you need to convert `vs` into
> list:
>
> result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k,
> list(vs)))
>
> finally, you could save the `result` into HDFS:
>
> result.saveAsPickleFile(path, batch=1024)
>
> >     2) Using output ( I really don't like this approach )  to pass to
> > function looks like not scalable and executed only on one machine?  What
> is
> > the way using PySpark process grouped keys in distributed fashion.
> > Multiprocessing and on different machine of the cluster.
> >
> > 3)In case of  processing output how data can be stored on hdfs?
>
> Currently, it's not easy to access files in HDFS, you could do it by
>
> sc.parallelize(local_data).map(str).saveAsTextFile()
>
> > Thanks
> > Oleg.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>

Re: PySpark on Yarn - how group by data properly

Posted by Davies Liu <da...@databricks.com>.
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets <or...@gmail.com> wrote:
> Hi ,
>
>    I came from map/reduce background and try to do quite trivial thing:
>
> I have a lot of files ( on hdfs ) - format is :
>
>    1 , 2 , 3
>    2 , 3 , 5
>    1 , 3,  5
>     2, 3 , 4
>     2 , 5, 1
>
>   I am actually need to group by key (first column) :
>   key   values
>   1 --> (2,3),(3,5)
>   2 --> (3,5),(3,4),(5,1)
>
>   and I need to process (pass)  values to the function f ( my custom
> function)
>   outcome of  function f()  should be  to hdfs with corresponding key:
>     1 --> f() outcome
>     2 --> f() outcome.
>
> My code is :
>
>       def doSplit(x):
>         y = x.split(',')
>         if(len(y)==3):
>            return  y[0],(y[1],y[2])
>
>
>     lines = sc.textFile(filename,1)
>     counts = lines.map(doSplit).groupByKey()
>     output = counts.collect()
>
>     for (key, value) in output:
>         print 'build model for key ->' , key
>         print value
>         f(str(key) , value))
>
>
> Questions:
>    1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
> groupByKey( f() ) to process grouped values? how can I process grouped keys
> by custom function? function f has some not trivial logic.

The result of groupByKey() is still RDD with (key, ResultIterable(values)),
so you can continue to call map() or mapValues() on it:

lines.map(doSplit).groupByKey().map(f)

But your `f` need two parameters, the map() will assume that `f`
take one parameter, so you need to build a wrapper for `f`:

lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs))

If the `f` only accept values as list, then you need to convert `vs` into list:

result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs)))

finally, you could save the `result` into HDFS:

result.saveAsPickleFile(path, batch=1024)

>     2) Using output ( I really don't like this approach )  to pass to
> function looks like not scalable and executed only on one machine?  What is
> the way using PySpark process grouped keys in distributed fashion.
> Multiprocessing and on different machine of the cluster.
>
> 3)In case of  processing output how data can be stored on hdfs?

Currently, it's not easy to access files in HDFS, you could do it by

sc.parallelize(local_data).map(str).saveAsTextFile()

> Thanks
> Oleg.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org