You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/12 22:06:52 UTC

[GitHub] [spark] viirya opened a new pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

viirya opened a new pull request #32136:
URL: https://github.com/apache/spark/pull/32136


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This patch proposes to add a plugin API for providing scheduling suggestion to Spark task scheduler.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Spark scheduler schedules tasks to executors in an arbitrary (maybe not accurate description, but I cannot find good term here) way. The schedule schedules the tasks by itself. Although there is locality configuration, the configuration is used for data locality purposes. Generally we cannot suggest the scheduler where a task should be scheduled to. Normally it is not a problem because the general task is executor-agnostic.
   
   But for special tasks, for example stateful tasks in Structured Streaming, state store is maintained at the executor side. Changing task location means reloading checkpoint data from the last batch. It has disadvantages from the performance perspective and also casts some limitations when we want to implement somes features in Structured Streaming.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   No. This API should be developer-only and currently for Spark internal only too.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   
   Unit test. Will add more tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824218678


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42266/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-833845738


   Note: I updated the doc with more details on goals, non-goals, use-case, deficiencies of current solution (locality), etc. Hopefully it helps others understand the proposal more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826148020


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137892/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824009507


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137729/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846696394


   > My major point is about the characteristic of the checkpoint location.
   > 
   > We require checkpoint location to be "fault-tolerant" including hardware failures (local storage doesn't make sense here), and provide "high availability" by itself so that Spark can delegate such complexity to the checkpoint location. For sure, such requirement leads underlying file system to be heavy and non-trivial to maintain, but IMHO that's not an enough reason to take the complexity back to Spark, because:
   
   I think the users face one major issue is, they don't have choice. For "fault-tolerant", as we consider PVC as an abstract way to look at storage, it can support that if the storage class supports the feature. Actually there is storage class supporting that. Again, it is about user-choice. Users can choose from different storage classes for PVC. How often the fault can occur and how serious a fault could be for the streaming app? Not to mention there is also snapshot support for volumes on K8S. From less to more, users can choose different storage classes to meet their requirements.
   
   For example, for a streaming app that fault may not be too serious issue, maybe local storage + occasional snapshot or local storage with raid may be good enough?
   
   For industry usage, sometimes it is not easy to ask whatever file system to use, e.g. Object stores in Azure or GCS or others, if the users want. Any backend file system adoption requires organization change, talent hiring, system engineering team support, policy change, etc.
   
   > I'd interpret the reasons as two folds:
   > 
   > A. Majority of real-world workloads are working well with current technology
   > B. Some workloads don't work well, but no strong demand on this as possible issues are tolerable
   
   I don't want to guess it here, but maybe another one possibility, they are moved to other streaming engine which can support their workloads easily.
   
   > I'd be happy to see the overall system design and the result of POC. Let's continue the talk about PVC once we get the details.
   
   Sure. This stage-level scheduling is a different direction than my original proposal. I need to take some time on revising it. I will keep it posted in other place e.g. new JIRA.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846358670


   > I'm not sure about the scenario of leveraging PVC as checkpoint location - at least that sounds to me as beyond the support of checkpoint in Structured Streaming.
   
   I agree on this, and yes, this is the current status. So that is said we are going to propose a new approach to support checkpoint in Structured Streaming. Unfortunately due to that fact that scheduling is bound to stateful tasks (i.e. state store locations), we cannot achieve the goal without touching other modules, like core.
    
   > I'm more likely novice on cloud/k8s, but from the common sense, I guess the actual storage of PVC should be still a sort of network storage to be resilient on "physical node down". I'm wondering how much benefits PVC approach gives compared to the existing approach as just directly use remote fault-tolerant file system. The benefits should be clear to cope with additional complexity.
   
   Technically, PVC is kinds of abstract way to look at the volume mounted on container running executor. It could be local storage on nodes on k8s. It depends where the PVC is bound to.
   
   HDFS becomes a bottleneck for our streaming jobs. The throughput to HDFS, the number of files as loading on name nodes, these are serious issues to use it as checkpoint destination for heavy streaming jobs in scale. Using PVC as checkpoint could be huge relief on the loading of HDFS. There are also others like better latency, simplified streaming architecture. Personally I think this is enough benefits as the motivation of our proposal.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-854768806


   > if user can't specify it themselves with stage level api, you are saying Spark would internally do it for the user?
   
   Yes. And we can add a new conf for users to control the behavior.
   
   > so essentially this is extending the locality feature and then the only thing you would need in stage level scheduling api is ability to say use this new locality algorithm for this stage?
   
   Yes. I'm thinking a bit more: we probably even don't need the stage level scheduling API ability. After knowing the "mapping", we can use it directly in `resourcesMeetTaskRequirements`. The "mapping" is actually a hard-coded task requirement, and use stage level scheduling API ability to specify that requirement looks redundant and unnecessary. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819097991


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41887/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824212159


   Kubernetes integration test unable to build dist.
   
   exiting with code: 1
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42266/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819332344


   > ... So (a) looks like using locality for state store location and (b) is that locality cannot guarantee actual location. Right? 
   
   Yes.
   
   > 1. it uses previous state store location as locality, so if no previous location info, we still let Spark pick up executor arbitrarily.
   
   So, how would the plugin help when there's no previous location info? 
   
   > 2. It depends on initial chosen executor-state store mapping. So if Spark choose a sub-optimal mapping, locality doesn't work well for later batches. 
   
   I think this is the point that matches my point of case b above.
   
   But looking at the code, it seems the plugin is still applied after locality scheduling? 
   
   > 3. Forcibly assigning state stores to executors can possibly lead to unreasonable scheduling decision. For example, we don't know if the executor satisfy resource requirement.
   
   I don't get this. Do you mean some executors may not be suitable for having a state store?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821926941


   **[Test build #137520 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137520/testReport)** for PR 32136 at commit [`7704499`](https://github.com/apache/spark/commit/77044990bcad603b750f9a3ed2c484fd28da7287).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826124648


   **[Test build #137892 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137892/testReport)** for PR 32136 at commit [`173bb07`](https://github.com/apache/spark/commit/173bb07a9251141f7081ed8cd60c9adae3c566bd).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818285914


   **[Test build #137245 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137245/testReport)** for PR 32136 at commit [`ae9a8cb`](https://github.com/apache/spark/commit/ae9a8cb78635386a3c57ef71e27138196e671f2f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821305514


   > I didn't see an API for this? informScheduledTask is called at scheduling so my assumption is the plugin may be keeping state about where things have went and if things reset I would have expected an API call to let the plugin know. in TaskSchedulerImpl.resourceOffers it may have assigned some tasks but then if it doesn't get all needed for barrier then it resets them. Maybe your intention is that it doesn't keep state? The intention is that api would just throw?
   
   This is good point. I agree we may need an API to be informed that the task assignment was reset. For my user-case, it is not related to barrier tasks, but you're right that if the plugin works with barrier tasks, it should be informed in case it keeps some states.
     
   > so I don't completely understand this. Are you just saying that the locality is not specific enough? I get the first micro-batch case kind of especially perhaps in the dynamic allocation type case - is that the case here, seems like you kind of hint at it above in a comment, but don't understand in other cases. Have you tried the newer locality algorithm vs the old one?
   > 
   > Does this come down to you really just want scheduler to force evenly distributed and then after that locality should work? It seems like you are saying it needs more then that though and locality isn't enough, would like to understand why.
   
   > > non-trivial locality config (e.g., 10h)
   > 
   > I'm not sure what that means? do you just mean it has more logic in figuring out the locality?
   
   For the other cases, I think generally locality should work for the purpose. It is how we make state store location stable right now, using locality. In the above comment, it is more like a guess that I'm wondering if a global locality that we need for forcing state store location unchanged across micro-batches, would also be suitable for other stages which no state store is involved.
   
   Because to force state store location unchanged, it means we need a long-enough locality value to ask Spark to delay scheduling stateful tasks. But as we know it may not respect cluster resource utilization. So it could be bad for other stages which do not have state store.
   
   > Overall I'm fine with having some sort of a plugin to allow people to experiment but I also want it generic enough to cover the cases I mentioned and for it to not cause problems where people can shoot themselves and then complain as to why things aren't working. It would be nice to really understand this case to see if that is needed or if just something else can be improved for all people benefit.
   
   Sure. Appreciate your comment and questions.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a change in pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r633357559



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       If it is not to be exposed, then why make it a plugin? Isn't the risk that you are now overgeneralizing something for a non-existent benefit?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       Can't we add a locality constraints that forces you to schedule on an executor instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-864257650


   Oh, we can continue the discussion here too if you prefer. I just think that as we are not to review the code here, maybe it is good to close the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819661211


   > I think the scheduler already distributes tasks evenly when there's no locality preference as we'll shuffle the executors before scheduling:
   > 
   > https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L535
   > 
   > Doesn't it work?
   
   For the code snippet, doesn't it depend on if all executors are available during the moment of making offers? For example, running SS job with state stores, it is easier to see the initial tasks are scheduled to part of all executors.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844858158


   > However I'm not sure if stage level scheduling can deal with executor lost case. Based on above comment, seems it cannot. That will be a major concern for the use-case here. During the task scheduling, once an executor is lost, we may need the scheduler be able to re-schedule the task to a particular executor (e.g. reused PVC in our case).
   
   So what if the state store resource is **required** not **optional**? It means, the task won't launch until getting the required state store. So in your PVC case, the task will wait until it re-mount to some executors. And if we make state store resource required, we should do the similar thing for the HDFS state store on executor lost. For example, we should reconstruct the state store on other active executors (or even we don't have to reconstruct the state store in reality but move the `StateStoreProviderId`s to other active executors' metadata (e.g., ExecutorData) should be enogh) so that the state store resources always exist and scheduling won't hang. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-859591939


   yeah it would be great to have a summary and also please describe exactly the flow when an executor is lost and how everything is updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826562394


   Hmm, does it mean this is an invalid requirement for Spark and we may not address it? I hope not the case.
   
   I totally agree that there are many aspects needed to be carefully looked at.  We should look at these aspects specially.
   
   Let me figure out. Is this a real issue? Yes. As our production applications face checkpoint related issues, the capacity to make state store location stable is one step towards fixing the issues. Should we address the issues? I think so, except that we don't expect to improve Spark used in streaming use-cases. Is this interface good enough? Maybe not, I'm open to hear suggestions about how/what we can make it better/useful to us and others.
   
   
    
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-857298601


   > Could you post the code snapshot?
   
   E.g.,
   
   ```scala
   ResourceProfileManager {
     private[spark] def isSupported(rp: ResourceProfile): Boolean = {
       ...
       val YarnOrK8sNotDynAllocAndNotDefaultProfile =
         isNotDefaultProfile && (isYarn || isK8s) && !dynamicEnabled  // <= Remove it?
       ...
   }
   ```
   
   >  Add a new type of task location, e.g., StateStoreTaskLocation(host, executorId, StateStoreProviderId) , and let BaseStateStoreRDD.getPreferredLocations returns it in string. Then, the TaskSetManager could establish the “mapping” while building the pending task list:
   
   Isn't it the mapping still executor id <-> statestore? Executor id could be changed due to executor lost. More robust mapping, e.g. for our use-case, might be PVC id <-> statestore.
   
   > I agree with this, its a matter of coming up with the right design to solve the problem and possibly others (in the case of plugin). If we discuss alternatives that become to complex we should drop them. But we should have the discussion like we are.
   
   Yes, agree. Appreciate for the discussion.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820828437


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42023/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818919213


   > I don't think we can guarantee it. It's a best effort and tasks should be able to run on any executor, thought tasks can have preferred executors (locality). Otherwise, we need to revisit many design decisions like how to avoid infinite wait, how to auto-scale, etc.
   
   SS is special use-case, especially for stateful tasks. You cannot scale the cluster like normal batch job.
   
   > Can you elaborate? If it's a problem of delay scheduling let's fix it instead.
   
   In #30812, @zsxwing, @HeartSaVioR and me have a long discussion around using locality for stateful tasks. You can see my original approach is to use locality, but overall it is considered too hacky and now I share the points from them. You may catch up the comments there.
   
   Basically I think the problem is, for a stateful job that we want to evenly distribute tasks to all executors and let the executor-task mapping relatively stable. With locality, we can only assign tasks to executors blindly. For example, the scheduler knows more about executor capacity and knows what executors should be assigned with tasks. But in SS, we don't have such info (and should not have it too).
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844712368


   After reviewing the SPIP doc and the WIP implementation, I think it's a reasonable requirement and scenario for SS. Since the API will be added to Spark Core, we might need extra confirmation from the others to make sure there's no side effect. cc @Ngone51 and @jiangxb1987 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-823944692


   **[Test build #137729 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137729/testReport)** for PR 32136 at commit [`29c0b6b`](https://github.com/apache/spark/commit/29c0b6b6495fa4366129f0a086d72deed8a985ca).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826133080


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42417/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824294219


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137741/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-858770342


   > And to ensure we're on the same page - for the `RequiredResourceLocation`, how would you provide the PVC info there? IIUC, you want to put the PVC info there, right?
   
   Yea, I have not tried yet as we are still in discussion phase. But my idea is to retrieve PVC info from scheduler backend (k8s) when it retrieves executor info. I guess it doesn't return such info now.
   
   So in state store rdd, when preparing preferred locations, it queries scheduler backend (if it is k8s) to get PVC info and fill into `RequiredResourceLocation`. `RequiredResourceLocation` should be general enough. So the resource requirement might be a map like resource -> resource id.
   
   At task scheduler side, during scheduling task set, it looks at required resources to meet task requirement in the location.
   
   Sounds okay?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-858718082


   I see.
   
   And to ensure we're on the same page - for the `RequiredResourceLocation`, how would you provide the PVC info there? IIUC, you want to put the PVC info there, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845771355


   > Let me know if I misunderstand it. So I think the idea is, for example, we go to set a flag at the task set of first micro batch. The flag tells Spark scheduler that we want this task set to be evenly distributed. Is it?
   
   Yes.
   
   > Actually it sounds hacky to me. But as I'm not used to work in core module, so if you and others working in core frequently agree with it. I'm okay for this as a solution to the first micro-batch issue.
   
   It's true it's kind of hacky..but it's a compromise way that allows us to move forward (assuming people still hold the negative willingness towards the plugin API).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824008599


   **[Test build #137729 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137729/testReport)** for PR 32136 at commit [`29c0b6b`](https://github.com/apache/spark/commit/29c0b6b6495fa4366129f0a086d72deed8a985ca).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846346176


   Please correct me if I'm missing here.
   
   If I understand correctly about stage level scheduling, you still need to specify "all" resources needed for "all" tasks in StateRDD; while that may block Spark to schedule when some resources are missing (like lost executor with PVC), I'm wondering how task level schedule would work as its intention. After this, locality is the only one we can deal with, and it's not an enforcement so we're back to the origin problem.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-847128813


   thanks for explaining some of the SS specific things.
   
   the stage level scheduling changes you are proposing are definitely not what it does now and we would definitely need a few features we were wanting to do anyway, like reusing existing containers, but that is all doable just needs to be implemented.  it sounds like really this would be a special stage level scheduling feature that is SS state store specific, because user would not specify an exact requirement just that tasks must be scheduling on executors with its specific state store. This means that if an executor goes down it has to wait for something else on executor to start up the task specific 
   state store - what is going to do that in this scenario?  Or you wait a certain period and schedule it anywhere.
   
   it seems like this feature could existing outside creating a new ResourceProfile with the stage level scheduling api's and user should be able to specify this option that would only work with stateStoreRDD.  Is it useful outside of that?   I don't see how unless we added another plugin point for executor to report back any resource and then come up with some api that it could call to do the mapping of taskId to resourceId reported back to it.
   
   > To follow the existing custom resource management, the state store resource might be maintained as ("statestore" -> list of statestore ids) as a part of the custom resource.
   
   I think this would mean scheduler would have some specific logic to be able to match task id to state store id, right?  Otherwise stage level scheduling would schedule a task on anything in that list., which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-857475125


   > Isn't it the mapping still executor id <-> statestore? Executor id could be changed due to executor lost. More robust mapping, e.g. for our use-case, might be PVC id <-> statestore.
   
   
   The mapping between executor id and statestore is necessarily needed. And it can be achieved by the existing framework - `ExecutorData.resourcesInfo`, so it won't be a problem. But the "mapping" between tasks and the specific resources (statestore in this case) is a new requirement that we have to add.
   
   As mentioned above, we probably even don't need the stage level scheduling API ability if we follow the `StateStoreTaskLocation` solution. And, then, the `isSupport` checking also won't be a problem.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824422185


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42278/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824401633


   **[Test build #137751 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137751/testReport)** for PR 32136 at commit [`d2a2b46`](https://github.com/apache/spark/commit/d2a2b46c5b3b68a30cddc43e9246e610c52e90f3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845740021


   > No. In our use-case, we want to get rid of HDFS for state store checkpoint. So the task will wait until the PVC re-mounts to another new executor. Our state store is checkpointed to PVC, not HDFS.
   
   Not sure which point you says "No" to..IIUC, I think stage level scheduling should work for your use case as long as you would let the driver know when the PVC re-mounts to another new executor (just reports the state stores ids related to that PVC).  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820814520


   > Correct me if I'm wrong: Spark tries its best to schedule SS tasks on executors that have existing state store data. This is already the case and is implemented via the preferred location. The problem we are solving here is the first micro-batch, where there is no existing state store data and we want to schedule the tasks of the first micro-batch evenly on the cluster. This is to avoid skews in the future that many SS tasks are running on very few executors.
   
   That is correct. However, even for not first micro-batch, we currently use preferred location + non-trivial locality config (e.g., 10h) to force Spark schedule tasks to previous locations. I think it is not flexible because locality is a global setting. A non-trivial locality config might cause sub-optimal result for other stages. And, it requires end-users to set it. It makes me feel that it is not a user-friendly approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818354593


   **[Test build #137245 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137245/testReport)** for PR 32136 at commit [`ae9a8cb`](https://github.com/apache/spark/commit/ae9a8cb78635386a3c57ef71e27138196e671f2f).
    * This patch **fails PySpark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819661211


   > I think the scheduler already distributes tasks evenly when there's no locality preference as we'll shuffle the executors before scheduling:
   > 
   > Doesn't it work?
   
   For the code snippet, doesn't it depend on if all executors are available during the moment of making offers? For example, running SS job with state stores, it is easier to see the initial tasks are scheduled to part of all executors.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-857846015


   > But the "mapping" between tasks and the specific resources (statestore in this case) is a new requirement that we have to add.
   
   Hm? We don't need mapping between tasks <-> statestore. Do you mean PVC id <-> statestore? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826506510


   As I mentioned in the doc, are we are trying to retrofit scenarios that Spark is not trying to handle ? Namely: some task for some stage must only run on a particular executor and not run anywhere else.
   I agree with @cloud-fan that there are too many interacting aspects that need to be carefully looked at here (resource allocation, fault tolerance, utilization, infinite wait for schedule, etc).
   
   On other hand, the usecase @tgravescs mentioned is an interesting one - how to change schedule behavior towards specific resource usage patterns : like bin-packing executors, etc. I think there have been past PR's towards that (particularly in context of elastic cloud env)
   Those require a global view to make decisions though, not just for a single executor.
   
   Making task scheduling pluggable would be an interesting experiment, but this has to be approached carefully given the interactions. Also, from an interface point of view, we want to ensure it is not specific to a single usecase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846329353


   I'm not sure about the scenario of leveraging PVC as checkpoint location - at least that sounds to me as beyond the support of checkpoint in Structured Streaming.
   
   We have been clearly describing about the requirement of checkpoint location in Structured Streaming guide page, like following:
   
   > Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
   
   I know we allow custom checkpoint manager implementations to deal with non-HDFS compatible file system (like object stores which don't provide "atomic rename"), but they still deal with "remote" "fault-tolerant" file system, and doesn't require Spark scheduler to schedule specific task to specific executor based on the availability of checkpoint location.
   
   In other words, only checkpoint manager handles the complexity of checkpoint on file system, not somewhere else. And sounds like it's no longer holding true if we want to support PVC based checkpoint. Please correct me if I'm missing something.
   
   I'm more likely novice on cloud/k8s, but from the common sense, I guess the actual storage of PVC should be still a sort of network storage to be resilient on "physical node down". I'm wondering how much benefits PVC approach gives compared to the existing approach as just directly use remote fault-tolerant file system. The benefits should be clear to cope with additional complexity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824218678


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42266/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824182278


   **[Test build #137739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137739/testReport)** for PR 32136 at commit [`29c0b6b`](https://github.com/apache/spark/commit/29c0b6b6495fa4366129f0a086d72deed8a985ca).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-827127527


   > Shall we have a chance to look into the case you have resolved and how you do that? It would be easier for you to persuade others via showing exactly what is happening and how this will help. Providing actual implementation would be one of, and if you like to do then I'll wait for that instead before looking into this. During investigation on real world problem, we might be able to find alternatives and compare them.
   
   Thanks. Yes, that is also I want to do. I'm implementing custom scheduling for SS stateful operations. I will open the reference PR soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819352186


   > ...it can distribute tasks to executors more evenly.
   
   I think the scheduler already distributes tasks evenly when there's no locality preference as we'll shuffle the executors before scheduling:
   
   https://github.com/apache/spark/blob/b5241c97b17a1139a4ff719bfce7f68aef094d95/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L535
   
   Doesn't it work?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-858375487


   > The mapping could be between specific resources (e.g. PVC) and task (i.e. state store). 
   
   Your rephrase looks good except for one point here.  "task (i.e. state store)"? You mean task is kind of a type of state store? is it a typo? I actually expect that it's a mapping between PVC and task Id.
   
   > Actually we cannot schedule a task to specific location of statestore.
   
   I don't understand this. I assume each statestore must be bound to a specific location. Why we can't schedule the task?
   
   > Maybe `ResourceLocation`? It means the task prefers a location with specific resource (e.g. PVC).
   
   `ResourceLocation` sounds too general. Mabye, `RequiredResourceLocation`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820810395


   **[Test build #137448 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137448/testReport)** for PR 32136 at commit [`5f6ed13`](https://github.com/apache/spark/commit/5f6ed13f076ad84660780999e03b2b9d71a1f1c3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-853697668


   (Sorry about the late reply.)
   
   > There is assertion that dynamic allocation should be enabled under stage-level scheduling. I mean if we remove such assertion, will it affect normal cases of stage-level scheduling?
   
   Could you post the code snapshot?
   
   --
   
   I’m thinking about how to establish the “mapping” with stage level scheduling. My current idea is:
   Add a new type of task location, e.g., `StateStoreTaskLocation(host, executorId, StateStoreProviderId)` , and let `BaseStateStoreRDD.getPreferredLocations` returns it in string. Then, the `TaskSetManager` could establish the “mapping” while building the pending task list:
   https://github.com/apache/spark/blob/2658bc590fec51e2266d03121c85b47f553022ec/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L250 
   , and probably using a map (from task index to its StateStoreProviderId) to store the mapping.
   
   @tgravescs @viirya WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-864070845


   is the plan to continue discussion in the doc and jira then?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r633703984



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       > If it is not to be exposed, then why make it a plugin? Isn't the risk that you are now overgeneralizing something for a non-existent benefit?
   
   I agree now the class name might be confusing. I thought it as a plugin. Now it is not a plugin you can plug into Spark but a private API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844775582


   Thanks for the ping @xuanyuanking 
   
   > does it fit into stage level scheduling
   
   This sounds feasible to me. We can treat the state store as a resource for the streaming task. And since the `StateStoreProviderId` is shared between batches, tasks between batches must be assigned the same state store as long as they require the same `StateStoreProviderId` (which is guaranteed by the stage level scheduling mechanism). Here's pseudo code may look like:
   
   ```scala
   case class StateStoreRDD {
   
    ...
    this. withResources(new ResourceProfile().add(StateStoreProviderId))
    ...
   }
   ```
   
   On the other side, driver should be able to update `ExecutorData.resourcesInfo` when `StateStoreCoordinatorRef` receives the active state store instance register so that the executor would contain the state store resource.
   
   One thing we need to pay attention to is that: there're might be no available executors for the specific `StateStoreProviderId` due to executor lost or 1st batch (where state store hasn't established
   ) which could leads the scheduling hang. Thus, I'm thinking of making the state store as an "optional" resource.
   
   While stage level scheduling solving the "must run at a particular executor" problem, the problem of unenvely distribution of the first batch still exits. I don't have a good idea yet. But I think we can add hack code in scheduler anyway (e.g., we can add the strategy as you added in the the #32422) as long as we know its the 1st batch.
   
   Thoughts?
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826124648


   **[Test build #137892 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137892/testReport)** for PR 32136 at commit [`173bb07`](https://github.com/apache/spark/commit/173bb07a9251141f7081ed8cd60c9adae3c566bd).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-859233951


   sgtm.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r612774003



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       I'm not sure now if we want it to be open to implement outside Spark. So just as private. Can be public if we have a consensus.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819097974


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41887/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818307007


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41825/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819960760


   > For the code snippet, doesn't it depend on if all executors are available during the moment of making offers?
   
   Yes, that's true. But normally, even in the case of offering a single resource that released from a single task, it seems it's less possible to schedule tasks unevenly unless the resources are really scarce.
   
   > For example, running SS job with state stores, it is easier to see the initial tasks are scheduled to part of all executors.
   
   Do you have logs related to the scheduling? I'd like to see how it happens.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818526472


   @viirya why is the current locality scheduling mechanism not good enough? What problem are you trying to fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826130994






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821932387


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42094/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821088567


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137473/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845752591


   > Oh, I mean we don’t use HDFS for state store reconstruction.
   
   Ah, for HDFS based state store, I was trying to explain how stage level scheduling should work for it excepts your PVC case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818550415


   #30812 is a previous attempt to use locality mechanism to stabilize state store location. Basically I want to do is to avoid Spark schedule streaming tasks which use state store (let me call them stateful tasks) to arbitrary executors. In short it wastes resource consumption on state store, and costs extra time on restoring state store on different executors.
   
   For the use-case, current locality seems a hacky approach as we can just blindly assign stateful tasks to executors evenly. We do not know if the assignment makes sense for the scheduler. It makes me think that we may need an API that we can use to provide scheduling suggestion.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821932387


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42094/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824424654


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42278/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845737203


   > > Just thinking out loud, if we still have to do something in the scheduler as plugin and that fulfills both requirements then why have 2 solutions (1 stage level scheduling and 1 scheduler plugin).
   > 
   > I may not present my idea clearly..I actually mean we don't add the plugin but replace it with the solution of stage level scheduling + add evenly distribution/spreading strategy (with an option maybe as you mentioned) to scheduler directly.
   
   Let me know if I misunderstand it. So I think the idea is, for example, we go to set a flag at the task set of first micro batch. The flag tells Spark scheduler that we want this task set to be evenly distributed. Is it?
   
   Actually it sounds hacky to me. But as I'm not used to work in core module, so if you and others working in core frequently agree with it. I'm okay for this as a solution to the first micro-batch issue.
   
   Besides, I also need to figure out if stage-level scheduling can satisfy the other requirements (e.g. my above comment).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r612445131



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       why is this private to spark if people are supposed to implement?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845745942


   > 
   <div class="js-discussion js-socket-channel ml-0 pl-0 ml-md-6 pl-md-3" data-channel="eyJjIjoibWFya2VkLWFzLXJlYWQ6MTYzOTcxNzQiLCJ0IjoxNjIxNTgzNTQzfQ==--6fa852c012afa82000dfc33c77461164153e5dd7f69a98ee63d2c88c9b6e742d" data-channel-target="MDExOlB1bGxSZXF1ZXN0NjEzOTgwMTM3" style="box-sizing: border-box; margin-left: 40px !important; padding-left: 16px !important; color: rgb(36, 41, 46); font-family: -apple-system, system-ui, &quot;Segoe UI&quot;, Helvetica, Arial, sans-serif, &quot;Apple Color Emoji&quot;, &quot;Segoe UI Emoji&quot;; font-size: 14px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;"><div class="js-timeline-it
 em js-timeline-progressive-focus-container" data-gid="MDEyOklzc3VlQ29tbWVudDg0NTc0NTM0Nw==" style="box-sizing: border-box;"><div class="TimelineItem js-comment-container" data-gid="MDEyOklzc3VlQ29tbWVudDg0NTc0NTM0Nw==" data-url="/_render_node/MDEyOklzc3VlQ29tbWVudDg0NTc0NTM0Nw==/timeline/issue_comment?variables%5BdeferredCommentActions%5D=false" style="box-sizing: border-box; position: relative; display: flex; padding: 16px 0px; margin-left: 16px;"><div class=" timeline-comment-group js-minimizable-comment-group js-targetable-element TimelineItem-body my-0 " id="issuecomment-845745347" style="box-sizing: border-box; margin-top: 4px; margin-bottom: 0px !important; min-width: 0px; max-width: 100%; color: var(--color-timeline-text); flex: 1 1 auto;"><div class="ml-n3 timeline-comment unminimized-comment comment previewable-edit js-task-list-container editable-comment js-comment timeline-comment--caret reorderable-task-lists" data-body-version="6f5dfd553ccc8e0ba2747426cf11be7b7694192baf
 d595ad23ef8f8bd48fa182" data-unfurl-hide-url="/content_reference_attachments/hide" style="box-sizing: border-box; margin-left: -16px !important; position: relative; color: var(--color-text-primary); background-color: var(--color-bg-primary); border: 1px solid var(--color-border-primary); border-radius: 6px;"><div class="edit-comment-hide" style="box-sizing: border-box;"><task-lists sortable="" style="box-sizing: border-box;">Oh, I mean we don't use HDFS for state store reconstruction.</task-lists></div><form class="js-comment-update" id="issuecomment-845745347-edit-form" action="https://github.com/apache/spark/issue_comments/845745347" accept-charset="UTF-8" method="post" style="box-sizing: border-box;"></form></div></div></div></div><div id="partial-timeline" class="js-timeline-marker js-socket-channel js-updatable-content" data-channel="eyJjIjoicHVsbF9yZXF1ZXN0OjYxMzk4MDEzNyIsInQiOjE2MjE1ODQwOTV9--b2412dd2c1813a96fb304082ebf76be20b861af1aedf24ca35711483af330106" data-url="/_render
 _node/MDExOlB1bGxSZXF1ZXN0NjEzOTgwMTM3/pull_requests/unread_timeline?variables%5BdeferCommitBadges%5D=false&amp;variables%5BdeferStatusCheckRollups%5D=false&amp;variables%5BdeferredCommentActions%5D=true&amp;variables%5BhasFocusedReviewComment%5D=false&amp;variables%5BhasFocusedReviewThread%5D=false&amp;variables%5BtimelinePageSize%5D=30&amp;variables%5BtimelineSince%5D=2021-05-21T08%3A01%3A32Z" data-last-modified="Fri, 21 May 2021 08:01:32 GMT" data-gid="MDExOlB1bGxSZXF1ZXN0NjEzOTgwMTM3" style="box-sizing: border-box;"></div></div><div class="discussion-timeline-actions" style="box-sizing: border-box; background-color: var(--color-bg-canvas); border-top: 2px solid var(--color-border-primary); color: rgb(36, 41, 46); font-family: -apple-system, system-ui, &quot;Segoe UI&quot;, Helvetica, Arial, sans-serif, &quot;Apple Color Emoji&quot;, &quot;Segoe UI Emoji&quot;; font-size: 14px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter
 -spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;"><div id="partial-pull-merging" class="pull-merging js-pull-merging js-socket-channel js-updatable-content js-pull-refresh-on-pjax " aria-live="polite" data-channel="eyJjIjoicmVwbzoyMTQ2NzkwNzpicmFuY2g6U1BBUkstMzUwMjIiLCJ0IjoxNjIxNTgzOTc3fQ==--789c530f63163c0b7c8bc5c867b67b0ba09b81777ee86f4aa1c91223d834f15f eyJjIjoicmVwbzoxNzE2NTY1ODpicmFuY2g6bWFzdGVyIiwidCI6MTYyMTU4Mzk3N30=--960c892b2590716eefd2894a57a18d355a1af0508ab60c849ac8bcab8d3e3a24 eyJjIjoicmVwbzoxNzE2NTY1ODpjb21taXQ6MTczYmIwN2E5MjUxMTQxZjcwODFlZDhjZDYwYzlhZGFlM2M1NjZiZCIsInQiOjE2MjE1ODM5Nzd9--390e3c952019b8b994ea9f8531e1e98b3dbc17774140e03d8aa11c62185fbef0 eyJjIjoiaXNzdWU6ODU2Mzk2MTUyOnN0YXRlIiwidCI6MTYyMTU4Mzk3N30=--17028907c4ca30ac7fd6132dfeed15fcb875832b
 e075d6c832735b9f5952674f eyJjIjoicHVsbF9yZXF1ZXN0OjYxMzk4MDEzNzpyZXZpZXdfc3RhdGUiLCJ0IjoxNjIxNTgzOTc3fQ==--773202746fb365ab1aa5e7e6623e83484427b47747296dc8ce0ab5717e19167f eyJjIjoicHVsbF9yZXF1ZXN0OjYxMzk4MDEzNzp3b3JrZmxvd19ydW4iLCJ0IjoxNjIxNTgzOTc3fQ==--1b8fb2ef491cf98d46b2e670f2969ffe82581862c8639301693d28413a96d305" data-url="/apache/spark/pull/32136/show_partial?merge_type=squash&amp;partial=pull_requests%2Fmerging" style="box-sizing: border-box;"><br class="Apple-interchange-newline"></div></div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824259805


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137739/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819155836


   > I just saw https://docs.google.com/document/d/1wfEaAZA7t02P6uBH4F3NGuH_qjK5e4X05v1E5pWNhlQ/edit# which has a few details. Would be good to link from description.
   
   Linked it from the description. Thanks for reminding.
   
   
   > questions:
   > 
   >     1. what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case?   if we create a plugin to choose location I wouldn't necessarily want locality to take affect.
   
   For now it still respects locality. My thought is to not interfere the scheduler too much. For each locality level, the scheduler will try to pick up one task from a list of tasks for the particular locality level. The API jumps in at the moment and let the scheduler know which tasks are most preferred on the executor. If users don't want locality to take effect, it is doable by disabling locality configs. Otherwise, from the API perspective, if it doesn't want a particular task to be on an executor, it can also let the scheduler know (aka, don't have it in the returned list).
   
   >     2. @param tasks The full list of tasks => this is all tasks even if done?  Would you want to know which ones are running already or which was have succeeded
   
   We may not need to know. The parameter can be discussed. As `taskIndexes` are indexes to the full task list, it is easier for us to get task from the index. Can be the subset of tasks, i.e. the tasks of the passed in task indexes pointing to, if it is preferred.
   
   >     3. this is being called from synchronized block, in the very least we need to document better and affects it could have on scheduling time
   
   Yea, I thought about it, but forgot to add the comment. I will do it in next commit.
   
   >     4. it looks like your plugin runs before blacklisting, is this really what we want or would plugin like to know to make better decision?
   
   Good point. I think the reason it runs before blacklisting is that it is easier to fit into current logic and seems safer. Currently the scheduler iterates each task from the list, and then checks blacklist before picking it up. If I let the API gets a list after blacklisting, seems it might be a larger change to the dequeue logic.
   
   >     5. how does this interact with barrier where it resets things if it doesn't get scheduled?
   
   IIUC, the plugin does not affect or change how the scheduler acts on barrier tasks. In the current dequeue logic, the scheduler doesn't have different behavior on barrier task/general task. For now if the scheduler cannot schedule all barrier tasks at once, it will reset the assigned resource offers. It is the same with the plugin.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821944239


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137520/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819075094


   **[Test build #137307 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137307/testReport)** for PR 32136 at commit [`3097e73`](https://github.com/apache/spark/commit/3097e732bb9a0d33fa090f0ff13c9a5ef752dbcc).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845780511


   > > So for the case that an executor with PVC_0 is lost, for example, seems the schedule will wait until another executor with PVC_0 comes to offer, is it correct?
   > 
   > Correct.
   
   Thank you @Ngone51. I will play around with stage level scheduling for POC and see how it goes to work with our use-case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845740021


   > No. In our use-case, we want to get rid of HDFS for state store checkpoint. So the task will wait until the PVC re-mounts to another new executor. Our state store is checkpointed to PVC, not HDFS.
   
   Not sure which point you say "No" to..IIUC, I think stage level scheduling should work for your use case as long as you would let the driver know when the PVC re-mounts to another new executor (e.g., just reports the state stores ids related to that PVC).  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824259805


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137739/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-854716584


   > But if we still have to introduce many invading changes (e.g., the mapping) even if reusing the stage level scheduling, I think we should revisit our decision.
   
   I agree with this, its a matter of coming up with the right design to solve the problem and possibly others (in the case of plugin).  If we discuss alternatives that become to complex we should drop them.   But we should have the discussion like we are.
   
   > BTW, it seems not possible for end users to specify the resource request by themselves as streaming uses the DataFrame API and StateStoreRDD hides from it.
   
   if user can't specify it themselves with stage level api, you are saying Spark would internally do it for the user?
   
   > There is assertion that dynamic allocation should be enabled under stage-level scheduling. I mean if we remove such assertion, will it affect normal cases of stage-level scheduling?
   
   We can relax the requirement if something like this is specified. If we were to add allowing new ResourceProfiles to fit into existing containers that requirement would be relaxed for that also.  We just need to make sure its clear to user so jobs don't hang waiting on getting containers they will never get.
   
   > I’m thinking about how to establish the “mapping” with stage level scheduling. My current idea is:
   Add a new type of task location, e.g., StateStoreTaskLocation(host, executorId, StateStoreProviderId) , and let BaseStateStoreRDD.getPreferredLocations returns it in string. Then, the TaskSetManager could establish the “mapping” while building the pending task list:
   
   so essentially this is extending the locality feature and then the only thing you would need in stage level scheduling api is ability to say use this new locality algorithm for this stage?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845123480


   For the stage level scheduling option, is the state store essentially the same across all executors?  meaning I don't care that a task gets on a particular executor that exactly matches an instance of a state store, but any executor as long as it has a state store.  Sorry here, my knowledge on the streaming is lacking.
   How is state store reconstructed when executor lost?  I assume its when a streaming task is assigned and the executor is missing the state store and not automatically on executor lost?
   
   > While stage level scheduling solving the "must run at a particular executor" problem, the problem of unenvely distribution of the first batch still exits. I don't have a good idea yet. But I think we can add hack code in scheduler anyway (e.g., we can add the strategy as you added in the the #32422) as long as we know its the 1st batch.
   
   Just thinking out loud,  if we still have to do something in the scheduler as plugin and that fulfills both requirements then why have 2 solutions (1 stage level scheduling and 1 scheduler plugin). 
   
   I left a bunch of comments in the design doc, I think there is still a bunch of details missing that might help find correct solution.  another option for the spreading is perhaps we just need a separate option for that, like spread vs cluster and then use stage level scheduling for state store but it depends on other details.  Is locality specified in the first set of tasks here that you want spread?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820827695






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824294219


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137741/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-833680616


   > I don't understand this. if the executor is lost isn't your state store lost? if its just on the host separate from executor, then host locality should still apply. Or are you referring to some new checkpoint mechanism where after a failure you would want the locality to change to where that checkpoint is?
   
   For example, we let state store checkpoint to a persistent volume on the executor, instead of HDFS. Once the executor is lost, we are able to mount the pv back on a new executor or another existing executor. Now, we don't want the state store to be assigned to any executor, but the executor with the pv with checkpointed state.
   
   BTW, how locality deals with executor lost? If I understand correctly, doesn't a long locality (e.g. 10h) mean Spark will hold the task waiting for the lost executor until locality wait finishes?
   
   > I'm definitely not against improving it or scheduling in general, and I never said that. I'm hesitant about the current proposal and implementation. I want clear goals and use cases it applies to, in order to make sure its implemented and solved in the proper way. I think my biggest complaint here is there is no complete overview or details, this kind of has piece meal explanations that when I look at seem to have holes in it, so I don't have a complete picture. I'm not sure this is the best place to plugin at if you really want flexibility.
   
   Sorry if I didn't state clearly the goals and our use-cases, although I thought I already explained above during the discussion. Let me reclaim it:
   
   We need the ability to have tasks (basically stateful tasks matter to us) scheduled on the executors we want. In other words, we need the ability to control where a task is scheduled to. Our goal is to improve the SS state store scheduling and checkpoint. Currently users can only rely on locality. However, locality has first-batch issue (explained in previous comment). Also, relying on user-facing config to solve platform problem seems a flaky approach. Not every user knows to set it.
   
   So it is clear that our use-case is for SS jobs with state store. One use-case of checkpoint with pv is stated at the beginning.
   
   The use-case is specific to SS. But, as Spark doesn't have scheduling plugin/API, we cannot limit the change only to SS module.
   
   > For instance I asked about this plugging in after locality and you said "If users don't want locality to take effect, it is doable by disabling locality configs". But this disables it for the entire job. If you just want spread on the first stage for instance and then everything after that to have locality, that doesn't work.
   
   Let's revisit the discussion:
   
   You question:
   > 1. what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case?   if we create a plugin to choose location I wouldn't necessarily want locality to take affect.
   
   Because you said if we create a plugin, you wouldn't want locality to take affect. So my answer is, you can disable the locality, if you doesn't want locality to take affect.
   
   I hope that can clarify it. Let me know if I misunderstand your questions.
   
   BTW, we also let stateful tasks only use plugin or locality, not both. If your question is, we don't want locaility work on stateful tasks but still want it for others.
   
   > so I know there is a doc associated with this, but I think it's not complete enough. I think it should go into more detail about specific problem, why locality isn't sufficient (or where locality isn't sufficient (first stage)), how those things interact, what all use cases this applies to, what use cases it doesn't solve or deficiencies in doing it this way. how does this really flow with SS in the different cases where executor lost and state store, explain it for someone who might not be familiar with it. @mridulm brought up a bunch of issues above in his comment as well.
   
   Okay. Let me enrich the doc with more details. Thanks for the suggestions.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-857846015


   > But the "mapping" between tasks and the specific resources (statestore in this case) is a new requirement that we have to add.
   
   Hm? We don't need mapping between tasks <-> statestore. Do you mean PVC id <-> task (i.e. statestore)? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-823978363


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42256/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846827648


   BTW, @Ngone51 Is dynamic allocation is required for stage-level scheduling? At least it is what I got from reading the code. But seems dynamic allocation has some issues (or not work well?) with SS (e.g. SPARK-24815)? Right? @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821071158


   **[Test build #137473 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137473/testReport)** for PR 32136 at commit [`1178309`](https://github.com/apache/spark/commit/11783096183f7338f496b299b27935acc35e5097).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds the following public classes _(experimental)_:
     * `class TestSchedulingPlugin2 extends TaskSchedulingPlugin `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824424561


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42278/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya closed pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya closed pull request #32136:
URL: https://github.com/apache/spark/pull/32136


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a change in pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r633359662



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       Can't we add a locality constraints that forces you to schedule on an executor instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820867957


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137448/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826631839


   Shall we have a chance to look into the case you have resolved and how you do that? It would be easier for you to persuade others via showing exactly what is happening and how this will help. During investigation on real world problem, we might be able to find alternatives and compare them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846336246


   > If the persistent volume is a resource, then it will have to be there on executor startup, so I guess a new executor checks for it on startup and advertises it. At that point, how does a task tell the difference between its state store and another one? The scheduler for resources only checks that the number of a particular resource match the task requirements, it doesn't differentiation different state store "ids". So it will assign a task to any executor with a PV.
   
   @tgravescs To clarify, I think it's state store rather than PV is the resource here. And the use case here might be a bit different from the classic use case(where the resources must be specified for executor launch). In this case, the executor can tell the state store resource to the driver only when the first time the state store instance is constructed. That means this special resource will be updated at runtime. (Streaming has the existing event `ReportActiveInstance` to report the state store instances, and we can extend it to update the executor's state store resources). To follow the existing custom resource management, the state store resource might be maintained as ("statestore" -> list of statestore ids) as a part of the custom resource. 
   
   
   > another point - What if entire node is lost not just executor? I guess the discovery script starting up new executor would load it from HDFS???
   
   So as mentioned above, we don't need the discovery script here. But yes the state store instances can be loaded from HDFS.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-859263950


   I am trying to catch up on this discussion, and it is a very long thread already :-) Thanks for all the discussion !
   Can we update the doc @viirya given that there seems to be some consensus developing ?
   Based on that, I will revisit comments to understand the rationales better for the various conclusion points.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818309043


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41825/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-859926068


   @mridulm @tgravescs Yeah, I will update the doc. Thanks for the discussion!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820810395


   **[Test build #137448 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137448/testReport)** for PR 32136 at commit [`5f6ed13`](https://github.com/apache/spark/commit/5f6ed13f076ad84660780999e03b2b9d71a1f1c3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821022078


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42048/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821931345






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846699726


   > That said, personally, as I commented on above comment thread, I'd like to see the interface to let end users do the hack on Spark and do whatever they want, with taking their own risks. In other words, task level scheduling itself sounds OK to me, though I promise I'll respect the decision of committers on CORE area.
   > 
   > Btw, IMHO, at least for now, initial state distribution and leveraging PVC are something which should be proved in various use cases / workloads in production, before taking them on the plate of discussions. Before then, I'd like to see Spark be customizable on their needs, so that they are no longer blocked on Spark side.
   
   This API is proposed to let other developers to change the way in Spark. And yes, taking with their own risks. Actually this might be most less intrusive way to support the improvements on SS side. We don't need to touch other core pieces but just let developers to work with the API.
   
   Maybe we even don't need to add the PVC stuff into Spark upstream. As it is pretty clean API implementation, we can just implement it at our side if Spark upstream doesn't want to take the complexity. Mentioning PVC is to show the use-case in our mind when we were asked about how we are going to use the API.
   
   Unfortunately so far this is unable to push forward. And I'm not sure right now how much change is needed for stage-level scheduling for similar function. But I also want to respect the decision of committers on core area, so I'd like to take a look on stage-level scheduling as it is suggested above.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824224659


   **[Test build #137741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137741/testReport)** for PR 32136 at commit [`d2a2b46`](https://github.com/apache/spark/commit/d2a2b46c5b3b68a30cddc43e9246e610c52e90f3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824224659


   **[Test build #137741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137741/testReport)** for PR 32136 at commit [`d2a2b46`](https://github.com/apache/spark/commit/d2a2b46c5b3b68a30cddc43e9246e610c52e90f3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-832536972


   @tgravescs @mridulm @HeartSaVioR @Ngone51 @cloud-fan Any more suggestive comments? As this is not public API, is there concern to have this as an experiment to unblock further development? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-831053315


   A reference implementation for custom stateful task scheduling is at #32422.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846329353


   I'm not sure about the scenario of leveraging PVC as checkpoint location - at least that sounds to me as beyond the support of checkpoint in Structured Streaming.
   
   We have been clearly describing about the requirement of checkpoint location in Structured Streaming guide page, like following:
   
   > Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
   
   I know we allow custom checkpoint manager implementations to deal with non-HDFS compatible file system (like object stores which don't provide "atomic rename"), but they still deal with "remote" "fault-tolerant" file system, and doesn't require Spark scheduler to schedule specific task to specific executor based on the availability of checkpoint location.
   
   In other words, only checkpoint manager handles the complexity of checkpoint on file system, not somewhere else. And sounds like it's no longer holding true if we want to support PVC based checkpoint. Please correct me if I'm missing something.
   
   I'm more likely novice on cloud/k8s, but from the common sense, I guess the actual storage of PVC should be still a sort of network storage to be resilient on "node down". I'm wondering how much benefits PVC approach gives compared to the existing approach as just directly use remote fault-tolerant file system. The benefits should be clear to cope with additional complexity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-832927163


   Evenly state store distribution is just one benefit the plugin enables. Locality cannot be used programmingly to control stateful tasks. For example, after setting up a long locality, once an executor is lost, will Spark be able to choose another  executor met our need? We plan to further enhance current SS checkpoint mechanism. One necessary piece is to be able to control stateful task location in cases like that.
   
   I'm willing to limit the change only to SS, but unfortunately Spark doesn't provide an API for task scheduling. Since I begin to work on SS in last few months, I feel that SS is somehow a neglected module. Some important features are stuck in past few years. SS is far behind other streaming solution in features. We still believe Spark can be our streaming solution. Driven by customer need on their streaming applications, we are working on to revive the features (session window, rocksdb state store) and also plan on new enhancements (checkpoint).
   
   Appreciate if you can re-consider the possibility to add this scheduling plugin. I'm open to change the API to be more general for other use-cases if you think it is better.
   
   cc @dbtsai @dongjoon-hyun 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824166177


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818919213


   
   > I don't think we can guarantee it. It's a best effort and tasks should be able to run on any executor, thought tasks can have preferred executors (locality). Otherwise, we need to revisit many design decisions like how to avoid infinite wait, how to auto-scale, etc.
   
   > > to avoid Spark schedule streaming tasks which use state store (let me call them stateful tasks) to arbitrary executors.
   
   Sorry I think this sentence is misleading. I don't mean to break "tasks should be able to run on any executor" design. This API doesn't break it. What we want is to be able to set constraint/condition on choosing which task to be scheduled to an executor. So basically tasks are still able to run on any executor. Just for some purpose, we need an executor to pick up a task.
   
   > Can you elaborate? If it's a problem of delay scheduling let's fix it instead.
   
   In #30812, @zsxwing, @HeartSaVioR and me have a long discussion around using locality for stateful tasks. You can see my original approach is to use locality, but overall it is considered too hacky and now I share the points from them. You may catch up the comments there.
   
   Basically I think the problem is, for a stateful job that we want to evenly distribute tasks to all executors and let the executor-task mapping relatively stable. With locality, we can only assign tasks to executors blindly. For example, the scheduler knows more about executor capacity and knows what executors should be assigned with tasks. But in SS, we don't have such info (and should not have it too).
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824444333


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137751/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820861744


   **[Test build #137448 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137448/testReport)** for PR 32136 at commit [`5f6ed13`](https://github.com/apache/spark/commit/5f6ed13f076ad84660780999e03b2b9d71a1f1c3).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820867957


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137448/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-823978317






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844775582


   Thanks for the ping @xuanyuanking 
   
   > does it fit into stage level scheduling
   
   This sounds feasible to me. We can treat the state store as a resource for the streaming task. And since the `StateStoreProviderId` is shared between batches, tasks between batches must be assigned the same state store as long as they require the same `StateStoreProviderId` (which is guaranteed by the stage level scheduling mechanism). Here's pseudo code may look like:
   
   ```scala
   case class StateStoreRDD {
   
    ...
    this.withResources(new ResourceProfile().add(StateStoreProviderId))
    ...
   }
   ```
   
   On the other side, driver should be able to update `ExecutorData.resourcesInfo` when `StateStoreCoordinatorRef` receives the active state store instance register so that the executor would contain the state store resource.
   
   One thing we need to pay attention to is that: there're might be no available executors for the specific `StateStoreProviderId` due to executor lost or 1st batch (where state store hasn't established
   ) which could leads the scheduling hang. Thus, I'm thinking of making the state store as an "optional" resource.
   
   While stage level scheduling solving the "must run at a particular executor" problem, the problem of unenvely distribution of the first batch still exits. I don't have a good idea yet. But I think we can add hack code in scheduler anyway (e.g., we can add the strategy as you added in the the #32422) as long as we know its the 1st batch.
   
   Thoughts?
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819342631


   > So, how would the plugin help when there's no previous location info?
   
   For example, in the plugin implementation, it can distribute tasks to executors more evenly.
   
   > I think this is the point that matches my point of case b above.
   > 
   > But looking at the code, it seems the plugin is still applied after locality scheduling?
   
   Locality still works. Generally, this plugin API doesn't break locality if it is set.
   
   > > 1. Forcibly assigning state stores to executors can possibly lead to unreasonable scheduling decision. For example, we don't know if the executor satisfy resource requirement.
   > 
   > I don't get this. Do you mean some executors may not be suitable for having a state store?
   
   For example, an executor is not capable for running the task? If we blindly assign stateful task to executor, we don't actually know if the executor is capable for the task. Only the scheduler knows the info. I think this is the major point discussed in my previous PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on a change in pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r612445523



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       should be marked with developer api




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846691545


   That said, personally, as I commented on above comment thread, I'd like to see the interface to let end users do the hack on Spark and do whatever they want, with taking their own risks. In other words, task level scheduling itself sounds OK to me, though I promise I'll respect the decision of committers on CORE area.
   
   Btw, IMHO, at least for now, initial state distribution and leveraging PVC are something which should be proved in various use cases / workloads in production, before taking them on the plate of discussions. Before then, I'd like to see Spark be able to customize on their needs, so that they are no longer blocked on Spark side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846329353


   I'm not sure about the scenario of leveraging PVC as checkpoint location - at least that sounds to me as beyond the support of checkpoint in Structured Streaming.
   
   We have been clearly describing about the requirement of checkpoint location in Structured Streaming guide page, like following:
   
   > Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
   
   I know we allow custom checkpoint manager implementations to deal with non-HDFS compatible file system, but they still deal with "remote" "fault-tolerant" file system, and doesn't require Spark scheduler to schedule specific task to specific executor based on the availability of checkpoint.
   
   In other words, only checkpoint manager handles the complexity of checkpoint on file system, not somewhere else. And sounds like it's no longer holding true if we want to support PVC based checkpoint. Please correct me if I'm missing something.
   
   I'm more likely novice on cloud/k8s, but from the common sense, I guess the actual storage of PVC should be still a sort of network storage to be resilient on "node down". I'm wondering how much benefits PVC approach gives compared to the existing approach as just directly use remote fault-tolerant file system. The benefits should be clear to cope with additional complexity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-857859603


   > Yes. I'm thinking a bit more: we probably even don't need the stage level scheduling API ability. After knowing the "mapping", we can use it directly in resourcesMeetTaskRequirements. The "mapping" is actually a hard-coded task requirement, and use stage level scheduling API ability to specify that requirement looks redundant and unnecessary.
   
   Sounds making sense. So let me rephrase it, and correct me if I misunderstand it.
   
   Basically, we introduce new task location `StateStoreTaskLocation`. The RDDs using statestore return this kind of task location as preferred locations.
   
   When `TaskSetManager` builds the pending task list, it could establish a mapping from the locations. The mapping could be between specific resources (e.g. PVC) and task (i.e. state store). `resourcesMeetTaskRequirements` directly uses the mapping to schedule tasks.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819661211


   > I think the scheduler already distributes tasks evenly when there's no locality preference as we'll shuffle the executors before scheduling:
   > 
   > Doesn't it work?
   
   For the code snippet, doesn't it depend on if all executors are available during the moment of making offers? It seems to be unreliable due to a problem like race condition. For example, running SS job with state stores, it is easier to see the initial tasks are scheduled to part of all executors.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-858365193


   BTW, `StateStoreTaskLocation` is not an accurate name for the purpose. Actually we cannot schedule a task to specific location of statestore. Task-statestore relation is fixed at the beginning. Maybe `ResourceLocation`? It means the task prefers a location with specific resource (e.g. PVC).
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a change in pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r633357559



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       If it is not to be exposed, then why make it a plugin? Isn't the risk that you are now overgeneralizing something for a non-existent benefit?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824254589


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42268/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r637338852



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       > I'm not sure now if we want it to be open to implement outside Spark.
   
   IMHO I'm thinking opposite on this. I'm not sure if we want to add some implementation in Spark codebase, unless the implementation of plugin proves that it works well for all possible cases.
   
   I'd rather be OK to let Spark users (including us) to be hackers and customize the scheduler with "taking their own risks", but I have to be conservative if we want to change the behavior of Spark. If changing the behavior of Spark to cope with state by default is the final goal, I think the behavioral change itself is a major one requiring SPIP, not something we can simply go after doing POC on happy case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-863792282






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-833555770


   >  once an executor is lost, will Spark be able to choose another executor met our need?
   
   I don't understand this.  if the executor is lost isn't your state store lost?  if its just on the host separate from executor, then host locality should still apply.  Or are you referring to some new checkpoint mechanism where after a failure you would want the locality to change to where that checkpoint is? 
   
   I don't use SS and I'm not familiar with all the aspects of scheduling for it so I might be missing parts as well.  I've seen the same thing, seems like not many people really working on it.
   
   I'm definitely not against improving it or scheduling in general, and I never said that. I'm hesitant about the current proposal and implementation. I want clear goals and use cases it applies to, in order to make sure its implemented and solved in the proper way.  I think my biggest complaint here is there is no complete overview or details, this kind of has piece meal explanations that when I look at seem to have holes in it, so I don't have a complete picture.  I'm not sure this is the best place to plugin at if you really want flexibility.
   
   For instance I asked about this plugging in after locality and you said "If users don't want locality to take effect, it is doable by disabling locality configs". But this disables it for the entire job.  If you just want spread on the first stage for instance and then everything after that to have locality, that doesn't work.
   
   so I know there is a doc associated with this, but I think it's not complete enough.  I think it should go into more detail about specific problem, why locality isn't sufficient (or where locality isn't sufficient (first stage)), how those things interact, what all use cases this applies to, what use cases it doesn't solve or deficiencies in doing it this way.  how does this really flow with SS in the different cases where executor lost and state store, explain it for someone who might not be familiar with it. @mridulm brought up a bunch of issues above in his comment as well. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818365180


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826133080






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846905399


   > BTW, @Ngone51 Is dynamic allocation is required for stage-level scheduling?
   
   It's required for the classic use case (when you really need to change executor resources dynamically) but not this case.
   
   In this case, we only leverage the task scheduling characteristic of the stage level scheduling. Thus, adding the task request for the state store resource only should be enough. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844858158


   > However I'm not sure if stage level scheduling can deal with executor lost case. Based on above comment, seems it cannot. That will be a major concern for the use-case here. During the task scheduling, once an executor is lost, we may need the scheduler be able to re-schedule the task to a particular executor (e.g. reused PVC in our case).
   
   So what if the state store resource is **required** not **optional**? It means, the task won't launch until getting the required state store. So in your PVC case, the task will wait until it re-mount to some executors. And if we make state store resource required, we should do the similar thing for the HDFS state store on executor lost. For example, we should reconstruct the state store on other active executors so that the state store resources always exist and scheduling won't hang. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846116061


   > So for the case that an executor with PVC_0 is lost, for example, seems the schedule will wait until another executor with PVC_0 comes to offer, is it correct?
   
   I'm not following exactly how this will work. Again this might be lacking knowledge on SS.
   
   If the persistent volume is a resource, then it will have to be there on executor startup, so I guess a new executor checks for it on startup and advertises it.   At that point, how does a task tell the difference between its state store and another one?  The scheduler for resources only checks that the number of a particular resource match the task requirements, it doesn't differentiation different state store "ids".  So it will assign a task to any executor with a PV.
   
   another point - What if entire node is lost not just executor?   I guess the discovery script starting up new executor would load it from HDFS???


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-853697668


   (Sorry about the late reply.)
   
   > There is assertion that dynamic allocation should be enabled under stage-level scheduling. I mean if we remove such assertion, will it affect normal cases of stage-level scheduling?
   
   Could you post the code snapshot?
   
   --
   
   I’m thinking about how to establish the “mapping” with stage level scheduling. My current idea is:
   Add a new type of task location, e.g., `StateStoreTaskLocation(host, executorId, StateStoreProviderId)` , and let `BaseStateStoreRDD.getPreferredLocations` returns it in string. Then, the `TaskSetManager` could establish the “mapping” while building the pending task list:
   https://github.com/apache/spark/blob/2658bc590fec51e2266d03121c85b47f553022ec/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L250 
   , and probably using a map (from task index to its StateStoreProviderId) to store the mapping.
   
   @tgravescs @viirya WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824258865


   **[Test build #137739 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137739/testReport)** for PR 32136 at commit [`29c0b6b`](https://github.com/apache/spark/commit/29c0b6b6495fa4366129f0a086d72deed8a985ca).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846691545


   That said, personally, as I commented on above comment thread, I'd like to see the interface to let end users do the hack on Spark and do whatever they want, with taking their own risks. In other words, task level scheduling itself sounds OK to me, though I promise I'll respect the decision of committers on CORE area.
   
   Btw, IMHO, at least for now, initial state distribution and leveraging PVC are something which should be proved in various use cases / workloads in production, before taking them on the plate of discussions. Before then, I'd like to see Spark be customizable on their needs, so that they are no longer blocked on Spark side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846713976


   > If I understand correctly about stage level scheduling, you still need to specify "all" resources needed for "all" tasks in StateRDD; while that may block Spark to schedule when some resources are missing (like lost executor with PVC), I'm wondering how task level schedule would work as its intention. After this, locality is the only one we can deal with, and it's not an enforcement so we're back to the origin problem.
   
   That's true. I think we can extend the `ResourceProfile.defaultProfile` by adding the state store request to id.
   
   And we may not need to add the state store request to the executor (but task only) so the executor doesn't need to load the state store at its launch time while using dynamic allocation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846687139


   My major point is about the characteristic of the checkpoint location.
   
   We require checkpoint location to be "fault-tolerant" including hardware failures (local storage doesn't make sense here), and provide "high availability" by itself so that Spark can delegate such complexity to the checkpoint location. For sure, such requirement leads underlying file system to be heavy and non-trivial to maintain, but IMHO that's not an enough reason to take the complexity back to Spark, because:
   
   1. Spark is already quite complicated
   
   Nowadays it's quite uneasy to bring a major changes without affecting existing behaviors. Personally I'd like to see the mainline of Apache Spark as ensuring majority's demands, not everyone's demands.
   
   2. The industry on file systems (or alike) also makes improvements
   
   Many end users were trying to deal with checkpoint location in object stores despite of eventual consistency of S3, and it even became strong consistency. Object stores in Azure had been providing strong consistency if I understand correctly. Not 100% sure of GCS but I heard they claim strong consistency.
   
   HDFS has been exposing several shortcomings so far, but the community is also making improvements like Apache Ozone.
   
   3. Spark community even didn't try to optimize the path
   
   I'd interpret the reasons as two folds:
   
   A. Majority of real-world workloads are working well with current technology
   B. Some workloads don't work well, but no strong demand on this as possible issues are tolerable
   
   I think we still don't struggle for this enough. There're still spots to reduce the latency down, like I did on optimizing WAL commit phase (#31495). I'd like to put my efforts on helping majority's use cases.
   
   > Technically, PVC is kinds of abstract way to look at the volume mounted on container running executor. It could be local storage on nodes on k8s. It depends where the PVC is bound to.
   
   PVC is not a kind of abstraction which "guarantees" fault-tolerant file system, so it still has to depend on the actual file system under the hood, and also the guarantees on interface accessing file system. I imagine the operational costs on making PVC guarantees such requirements would be non-trivial as well.
   
   > Using PVC as checkpoint could be huge relief on the loading of HDFS. There are also others like better latency, simplified streaming architecture.
   
   I'd be happy to see the overall system design and the result of POC. Let's continue the talk about PVC once we get the details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845694478


   > For the stage level scheduling option, is the state store essentially the same across all executors?
   
   No. Tasks with the different partition ids must use the different state stores.  And tasks with the same partition id between micro-batches must use the same state store.
   
   > How is state store reconstructed when executor lost? I assume its when a streaming task is assigned and the executor is missing the state store and not automatically on executor lost?
   
   Yes. Currently, it's reconstructed lazily. And that's preferable normally as reconstruction involves I/O operation - means it needs to read persisted state store data from HDFS and reconstruct an in-memory state store instance for fast operation.
   
   > Just thinking out loud, if we still have to do something in the scheduler as plugin and that fulfills both requirements then why have 2 solutions (1 stage level scheduling and 1 scheduler plugin).
   
   I may not present my idea clearly..I actually mean we don't add the plugin but replace it with the solution of stage level scheduling + add evenly distribution/spreading strategy (with an option maybe as you mentioned) to scheduler directly.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824444333


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137751/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824293638


   **[Test build #137741 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137741/testReport)** for PR 32136 at commit [`d2a2b46`](https://github.com/apache/spark/commit/d2a2b46c5b3b68a30cddc43e9246e610c52e90f3).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821022078


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42048/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818285914


   **[Test build #137245 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137245/testReport)** for PR 32136 at commit [`ae9a8cb`](https://github.com/apache/spark/commit/ae9a8cb78635386a3c57ef71e27138196e671f2f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-823921792


   I implemented the task scheduling plugin specially for stateful operators in Structured Streaming in last few days. In the experiments, it successfully keeps the state store location fixed across micro-batches and also distributes state store locations as expected. I will create another PR of the implementation as a reference.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-832699455


   Personally for me, I'm hesitant about putting a plugin at this point without making sure it can address other use cases.  even if its a developer plugin, if I go remove or change it, I assume that breaks your usage.  
   
   It seems like the only case you are trying to solve here is streaming and looking at the reference PR is the main difference is you want to evenly distribute tasks for the first batch?  I guess in the above comments you hint at needing it for other stages as well or its hard to configure locality to be long enough, but I don't understand how the wait time differs when spark locality does it vs this plugin does it. I guess you state for other stages that are running that don't have the stateful store are affected, does that just mean we want locality to be controlled at a stage level (does it fit into stage level scheduling)?  What locality is it using for those other stages?   I'm curious is this using the old locality algorithm or the new one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844858158


   > However I'm not sure if stage level scheduling can deal with executor lost case. Based on above comment, seems it cannot. That will be a major concern for the use-case here. During the task scheduling, once an executor is lost, we may need the scheduler be able to re-schedule the task to a particular executor (e.g. reused PVC in our case).
   
   So what if the state store resource is **required** not **optional**? It means, the task won't launch until getting the required state store. So in your PVC case, the task will wait until it re-mount to some executors. And if we make state store resource required, we should do a similar thing for the HDFS state store on executor lost. For example, we should reconstruct the state store on other active executors so that the state store resources always exist and scheduling won't hang. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818365180


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846687139


   My major point is about the characteristic of the checkpoint location.
   
   We require checkpoint location to be "fault-tolerant" including hardware failures (local storage doesn't make sense here), and provide "high availability" by itself so that Spark can delegate such complexity to the checkpoint location. For sure, such requirement leads underlying file system to be heavy and non-trivial to maintain, but IMHO that's not an enough reason to take the complexity back to Spark, because:
   
   1. Spark is already quite complicated
   
   Nowadays it's quite uneasy to bring a major changes without affecting existing behaviors. Personally I'd like to see the mainline of Apache Spark as ensuring majority's demands, not everyone's demands.
   
   2. The industry on file systems (or alike) also makes improvements
   
   Many end users were trying to deal with checkpoint location in object stores despite of eventual consistency of S3, and it even became strong consistency. Object stores in Azure had been providing strong consistency if I understand correctly. Not 100% sure of GCS but I heard they claim strong consistency.
   
   HDFS has been exposing several shortcomings so far, but the community is also making improvements like Apache Ozone.
   
   3. Spark community even didn't try to optimize the path
   
   I'd interpret the reasons as two folds:
   
   A. Majority of real-world workloads are working well with current technology
   B. Some workloads don't work well, but no strong demand on this as possible issues are tolerable
   
   I think we still don't struggle for this enough. There're still spots to reduce the latency down, like I did on optimizing WAL commit phase (#31495). I'd like to put my efforts on helping majority's use cases.
   
   > Technically, PVC is kinds of abstract way to look at the volume mounted on container running executor. It could be local storage on nodes on k8s. It depends where the PVC is bound to.
   
   PVC is not a kind of abstraction which "guarantees" fault-tolerant file system, so it still has to depend on the actual file system under the hood, and also the guarantees on interface accessing file system. I imagine the operational costs on making PVC guarantee such requirements would be non-trivial as well.
   
   > Using PVC as checkpoint could be huge relief on the loading of HDFS. There are also others like better latency, simplified streaming architecture.
   
   I'd be happy to see the overall system design and the result of POC. Let's continue the talk about PVC once we get the details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824182278


   **[Test build #137739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137739/testReport)** for PR 32136 at commit [`29c0b6b`](https://github.com/apache/spark/commit/29c0b6b6495fa4366129f0a086d72deed8a985ca).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821941288


   **[Test build #137520 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137520/testReport)** for PR 32136 at commit [`7704499`](https://github.com/apache/spark/commit/77044990bcad603b750f9a3ed2c484fd28da7287).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820986926


   **[Test build #137473 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137473/testReport)** for PR 32136 at commit [`1178309`](https://github.com/apache/spark/commit/11783096183f7338f496b299b27935acc35e5097).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826133080






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845123480


   
   For the stage level scheduling option, is the state store essentially the same across all executors?  meaning I don't care that a task gets on a particular executor that exactly matches an instance of a state store, but any executor as long as it has a state store.  Sorry here, my knowledge on the streaming is lacking.
   How is state store reconstructed when executor lost?  I assume its when a streaming task is assigned and the executor is missing the state store and not automatically on executor lost?
   
   > While stage level scheduling solving the "must run at a particular executor" problem, the problem of unenvely distribution of the first batch still exits. I don't have a good idea yet. But I think we can add hack code in scheduler anyway (e.g., we can add the strategy as you added in the the #32422) as long as we know its the 1st batch.
   
   Just thinking out loud,  if we still have to do something in the scheduler as plugin and that fulfills both requirements then why have 2 solutions (1 stage level scheduling and 1 scheduler plugin). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818452185


   cc @mridulm and @tgravescs FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826133080


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42417/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-833680616


   > I don't understand this. if the executor is lost isn't your state store lost? if its just on the host separate from executor, then host locality should still apply. Or are you referring to some new checkpoint mechanism where after a failure you would want the locality to change to where that checkpoint is?
   
   For example, we let state store checkpoint to a persistent volume on the executor, instead of HDFS. Once the executor is lost, we are able to mount the pv back on a new executor or another existing executor. Now, we don't want the state store to be assigned to any executor, but the executor with the pv with checkpointed state.
   
   BTW, how locality deals with executor lost? If I understand correctly, doesn't a long locality (e.g. 10h) mean Spark will hold the task waiting for the lost executor until locality wait finishes?
   
   > I'm definitely not against improving it or scheduling in general, and I never said that. I'm hesitant about the current proposal and implementation. I want clear goals and use cases it applies to, in order to make sure its implemented and solved in the proper way. I think my biggest complaint here is there is no complete overview or details, this kind of has piece meal explanations that when I look at seem to have holes in it, so I don't have a complete picture. I'm not sure this is the best place to plugin at if you really want flexibility.
   
   Sorry if I didn't state clearly the goals and our use-cases, although I thought I already explained above during the discussion. Let me reclaim it:
   
   We need the ability to have tasks (basically stateful tasks matter to us) scheduled on the executors we want. In other words, we need the ability to control where a task is scheduled to. Our goal is to improve the SS state store scheduling and checkpoint. Currently users can only rely on locality. However, locality has first-batch issue (explained in previous comment). Also, relying on user-facing config to solve platform problem seems a flaky approach. Not every user knows to set it.
   
   So it is clear that our use-case is for SS jobs with state store. One use-case of checkpoint with pv is stated at the beginning.
   
   The use-case is specific to SS. But, as Spark doesn't have scheduling plugin/API, we cannot limit the change only to SS module.
   
   > For instance I asked about this plugging in after locality and you said "If users don't want locality to take effect, it is doable by disabling locality configs". But this disables it for the entire job. If you just want spread on the first stage for instance and then everything after that to have locality, that doesn't work.
   
   Let's revisit the discussion:
   
   You question:
   > 1. what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case?   if we create a plugin to choose location I wouldn't necessarily want locality to take affect.
   
   Because you said if we create a plugin, you wouldn't want locality to take affect. So my answer is, you can disable the locality, if you doesn't want locality to take affect.
   
   I hope that can clarify it. Let me know if I misunderstand your questions.
   
   BTW, we also let stateful tasks only use plugin or locality, not both. If your question is, we don't want locaility work on stateful tasks but still want it for others.
   
   > so I know there is a doc associated with this, but I think it's not complete enough. I think it should go into more detail about specific problem, why locality isn't sufficient (or where locality isn't sufficient (first stage)), how those things interact, what all use cases this applies to, what use cases it doesn't solve or deficiencies in doing it this way. how does this really flow with SS in the different cases where executor lost and state store, explain it for someone who might not be familiar with it. @mridulm brought up a bunch of issues above in his comment as well.
   
   Okay. Let me enrich the doc with more details. I thought not to put too many SS stuffs in the doc, let me add them. Thanks for the suggestions.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826148020


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137892/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-823921792


   I implemented the task scheduling plugin specially for stateful operators in Structured Streaming in last few days. In the experiments, it successfully keeps the state store location fixed across micro-batches. I will create another PR of the implementation as a reference.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821926941


   **[Test build #137520 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137520/testReport)** for PR 32136 at commit [`7704499`](https://github.com/apache/spark/commit/77044990bcad603b750f9a3ed2c484fd28da7287).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826506510


   As I mentioned in the doc, are we are trying to retrofit scenarios that Spark is not trying to handle ? Namely: some task for some stage must only run on a particular executor and not run anywhere else.
   I agree with @cloud-fan that there are too many interacting aspects that need to be carefully looked at here (resource allocation, fault tolerance, utilization, infinite wait for schedule, etc).
   
   On other hand, the usecase @tgravescs mentioned is an interesting one - how to change schedule behavior towards specific resource usage patterns : like bin-packing executors, etc. I think there have been past PR's towards that (particularly in context of elastic cloud env)
   Those require a global view to make decisions though, not just for a single executor.
   
   Making task scheduling pluggable would be an interesting experiment, but this has to be approached carefully given the interactions. Also, from an interface point of view, we want to ensure it is not specific to a single usecase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824009507


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137729/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821944239


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137520/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820040504


   > Yes, that's true. But normally, even in the case of offering a single resource that released from a single task, it seems it's less possible to schedule tasks unevenly unless the resources are really scarce.
   
   As I saw in previous tests, it is by chance to have all tasks are evenly distributed to all executors. Sometimes it is, but sometimes only partial executors are scheduled at the first batch.
    
   > Do you have logs related to the scheduling? I'd like to see how it happens.
   
   I don't have logs now. It is a general and simple SS job reading from Kafka.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824443949


   **[Test build #137751 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137751/testReport)** for PR 32136 at commit [`d2a2b46`](https://github.com/apache/spark/commit/d2a2b46c5b3b68a30cddc43e9246e610c52e90f3).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824424654


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42278/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818756850


   I would like to see more overview and design details.  I think the idea of having something here is good because some people may want to cluster tasks, while some might want to spread them. You might want to place them based on hardware or something else.  I want to understand how flexible this plugin api you are proposing it.
   
   I just saw https://docs.google.com/document/d/1wfEaAZA7t02P6uBH4F3NGuH_qjK5e4X05v1E5pWNhlQ/edit# which has a few details.  Would be good to link from description. 
   
   questions:
   1) what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case?   if we create a plugin to choose location I wouldn't necessarily want locality to take affect.
   2)  @param tasks The full list of tasks => this is all tasks even if done?  Would you want to know which ones are running already or which was have succeeded
   3) this is being called from synchronized block, in the very least we need to document better and affects it could have on scheduling time
   4) it looks like your plugin runs before blacklisting, is this really what we want or would plugin like to know to make better decision?
   5) how does this interact with barrier where it resets things if it doesn't get scheduled?
   
   
   I would like to see how this applies to other use cases I mentioned above before putting this in.
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818274820


   May need to have more test cases. But I'd like to ask the opinions from the community first.
   
   cc @cloud-fan @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819097078


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41887/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824254589


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42268/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824248555


   Kubernetes integration test unable to build dist.
   
   exiting with code: 1
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42268/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818309043


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41825/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-847649737


   > Which check are you referring to?
   
   There is assertion that dynamic allocation should be enabled under stage-level scheduling. I mean if we remove such assertion, will it affect normal cases of stage-level scheduling?
   
   > @tgravescs yes the target might be archieved directly with the mapping. But I think that's would be the last choice as the community wants to introduce less invading changes when working across the modules. And that's the reason that @viirya proposed plugin APIs first and we're now discussing the possibility of reusing existing feature - stage level scheduling.
   But if we still have to introduce many invading changes (e.g., the mapping) even if reusing the stage level scheduling, I think we should revisit our decision.
   
   That's correct as this is one major point behind this API. It is proposed to keep as separate as possible to not affect Spark other than SS code path. Personally I'd refer a separate design. You may know more about how much invading change we need to support the use-case in stage-level scheduling. Let me know if you need to revisit the decision so I can follow up with the direction. I'll be willing to continue on this API or turn to stage-level scheduling with some changes if you think it is better.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821013742






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821088567


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137473/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-847498828


   > So, does it mean I can remove the dynamic allocation check for our case without affecting classic stage-level scheduling?
   
   Which check are you referring to?
   
   >> This means that if an executor goes down it has to wait for something else on executor to start up the task specific
   >>state store - what is going to do that in this scenario? Or you wait a certain period and schedule it anywhere.
   >
   >I think this could be configurable. Users can configure it to schedule on anywhere and load from checkpointed data, or in other case just fail the streaming app, after a certain period.
   
   Yes. Besides, in the Spark default use case, we can also move the state store resources to other active executors (`ExecutorData`). And tasks would just reconstruct them at the executor later.
   
   
   >>I think this would mean scheduler would have some specific logic to be able to match task id to state store id, right? Otherwise stage level scheduling would schedule a task on anything in that list., which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.
   >
   >Hmm, @Ngone51, is it true? For other resources like GPUs, it makes sense but in this case we need specific (task id)-(resource id, e.g. state store id, PVC claim name, etc.) bound.
   
   That's true. We need the mapping. I thought about the using the exiting task info (e.g., prtitionId) should be enough to match the right state store but looks wrong. We'd have to add the extra info for the mapping.
   
   > which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.
   
   @tgravescs yes the target might be archieved directly with the mapping. But I think that's would be the last choice as the community wants to introduce less invading changes when working across the modules. And that's the reason that @viirya proposed plugin APIs first and we're now discussing the possibility of reusing existing feature - stage level scheduling.
   But if we still have to introduce many invading changes (e.g., the mapping) even if reusing the stage level scheduling, I think we should revisit our decision.
   
   
   > it seems like this feature could existing outside creating a new ResourceProfile with the stage level scheduling api's and user should be able to specify this option that would only work with stateStoreRDD. Is it useful outside of that? I don't see how unless we added another plugin point for executor to report back any resource and then come up with some api that it could call to do the mapping of taskId to resourceId reported back to it.
   
   I think it's only used for `StateStoreRDD` yet. BTW, it seems not possible for end users to specify the resource request by themselves as streaming uses the DataFrame API and `StateStoreRDD` hides from it.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844858158


   > However I'm not sure if stage level scheduling can deal with executor lost case. Based on above comment, seems it cannot. That will be a major concern for the use-case here. During the task scheduling, once an executor is lost, we may need the scheduler be able to re-schedule the task to a particular executor (e.g. reused PVC in our case).
   
   So what if the state store resource is **required** not **optional**? It means, the task won't launch until getting the required state store. So in your PVC case, the task will wait until it re-mount to some executors. And if we make state store resource **required***, we should do a similar thing for the HDFS state store on executor lost. For example, we should reconstruct the state store on other active executors so that the state store resources always exist and scheduling won't hang. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819136140


   **[Test build #137307 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137307/testReport)** for PR 32136 at commit [`3097e73`](https://github.com/apache/spark/commit/3097e732bb9a0d33fa090f0ff13c9a5ef752dbcc).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826146608


   **[Test build #137892 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137892/testReport)** for PR 32136 at commit [`173bb07`](https://github.com/apache/spark/commit/173bb07a9251141f7081ed8cd60c9adae3c566bd).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819145361


   > a) In the case of streaming workloads, I think the locality info here is about the state store instead of data. e.g.,
   > So I think that locality preference scheduling (or delay scheduling) would also apply to it (the state store location).
   > 
   > b) That being said, I actually had the same concern when I toughed the streaming code. Because I know that delay scheduling doesn't guarantee the final scheduling location to be the preferred location provided by the task. So, the cost of reloading statestore would still exist potentially.
   
   Let me figure the difference between a and b. So (a) looks like using locality for state store location and (b) is that locality cannot guarantee actual location. Right? Please let me know if I misunderstand.
   
   I have tried to use locality for tasks with state stores in #30812. As you know (in the code snippet), actually SS already uses locality for state store location. However it has a few problems: 1. it uses previous state store location as locality, so if no previous location info, we still let Spark pick up executor arbitrarily. 2. It depends on initial chosen executor-state store mapping. So if Spark choose a sub-optimal mapping, locality doesn't work well for later batches. 3. Forcibly assigning state stores to executors can possibly lead to unreasonable scheduling decision. For example, we don't know if the executor satisfy resource requirement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845745347


   > Not sure which point you say "No" to..IIUC, I think stage level scheduling should work for your use case as long as you would let the driver know when the PVC re-mounts to another new executor (e.g., just reports the state stores ids related to that PVC).
   
   Oh, I mean we don't use HDFS for state store reconstruction.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818305372


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41825/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-833680616


   > I don't understand this. if the executor is lost isn't your state store lost? if its just on the host separate from executor, then host locality should still apply. Or are you referring to some new checkpoint mechanism where after a failure you would want the locality to change to where that checkpoint is?
   
   For example, we let state store checkpoint to a persistent volume on the executor, instead of HDFS. Once the executor is lost, we are able to mount the pv back on a new executor or another existing executor. Now, we don't want the state store to be assigned to any executor, but the executor with the pv with checkpointed state.
   
   > I'm definitely not against improving it or scheduling in general, and I never said that. I'm hesitant about the current proposal and implementation. I want clear goals and use cases it applies to, in order to make sure its implemented and solved in the proper way. I think my biggest complaint here is there is no complete overview or details, this kind of has piece meal explanations that when I look at seem to have holes in it, so I don't have a complete picture. I'm not sure this is the best place to plugin at if you really want flexibility.
   
   Sorry if I didn't state clearly the goals and our use-cases, although I thought I already explained above during the discussion. Let me reclaim it:
   
   We need the ability to have tasks (basically stateful tasks matter to us) scheduled on the executors we want. In other words, we need the ability to control where a task is scheduled to. Our goal is to improve the SS state store scheduling and checkpoint. Currently users can only rely on locality. However, locality has first-batch issue (explained in previous comment). Also, relying on user-facing config to solve platform problem seems a flaky approach. Not every user knows to set it.
   
   So it is clear that our use-case is for SS jobs with state store. One use-case of checkpoint with pv is stated at the beginning.
   
   The use-case is specific to SS. But, as Spark doesn't have scheduling plugin/API, we cannot limit the change only to SS module.
   
   > For instance I asked about this plugging in after locality and you said "If users don't want locality to take effect, it is doable by disabling locality configs". But this disables it for the entire job. If you just want spread on the first stage for instance and then everything after that to have locality, that doesn't work.
   
   Let's revisit the discussion:
   
   You question:
   > 1. what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case?   if we create a plugin to choose location I wouldn't necessarily want locality to take affect.
   
   Because you said if we create a plugin, you wouldn't want locality to take affect. So my answer is, you can disable the locality, if you doesn't want locality to take affect.
   
   I hope that can clarify it. Let me know if I misunderstand your questions.
   
   > so I know there is a doc associated with this, but I think it's not complete enough. I think it should go into more detail about specific problem, why locality isn't sufficient (or where locality isn't sufficient (first stage)), how those things interact, what all use cases this applies to, what use cases it doesn't solve or deficiencies in doing it this way. how does this really flow with SS in the different cases where executor lost and state store, explain it for someone who might not be familiar with it. @mridulm brought up a bunch of issues above in his comment as well.
   
   Okay. Let me enrich the doc with more details. Thanks for the suggestions.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820828437


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42023/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819138256


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137307/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819075094


   **[Test build #137307 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137307/testReport)** for PR 32136 at commit [`3097e73`](https://github.com/apache/spark/commit/3097e732bb9a0d33fa090f0ff13c9a5ef752dbcc).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819138256


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/137307/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-847209424


   > It's required for the classic use case (when you really need to change executor resources dynamically) but not this case.
   > 
   > In this case, we only leverage the task scheduling characteristic of the stage level scheduling. Thus, adding the task request for the state store resource only should be enough.
   
   Good to know that. So, does it mean I can remove the dynamic allocation check for our case without affecting classic stage-level scheduling?
   
   > This means that if an executor goes down it has to wait for something else on executor to start up the task specific
   state store - what is going to do that in this scenario? Or you wait a certain period and schedule it anywhere.
   
   I think this could be configurable. Users can configure it to schedule on anywhere and load from checkpointed data, or in other case just fail the streaming app, after a certain period. 
   
   > I think this would mean scheduler would have some specific logic to be able to match task id to state store id, right? Otherwise stage level scheduling would schedule a task on anything in that list., which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.
   
   Hmm,  @Ngone51, is it true? For other resources like GPUs, it makes sense but in this case we need specific (task id)-(resource id, e.g. state store id, PVC claim name, etc.) bound.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-844836740


   > This sounds feasible to me. We can treat the state store as a resource for the streaming task. And since the `StateStoreProviderId` is shared between batches, tasks between batches must be assigned the same state store as long as they require the same `StateStoreProviderId` (which is guaranteed by the stage level scheduling mechanism). Here's pseudo code may look like:
   > On the other side, driver should be able to update `ExecutorData.resourcesInfo` when `StateStoreCoordinatorRef` receives the active state store instance register so that the executor would contain the state store resource.
   
   Thanks @xuanyuanking and @Ngone51.
   
   This might be a feasible direction to unblock this. Let me think about it and maybe POC it locally.
   
   Roughly looking at it, stage level scheduling can deal with particular task to particular executor issue. This looks okay. 
   
   #32422 is based on the API added here. So there is a hook/API or something that we can rely to change the scheduler behavior for first batch. Specifically, it is a streaming concept and so without an API/hook there it will be ugly to add hack code into the scheduler to fit this need. We can discuss this, of course.
   
   However I'm not sure if stage level scheduling can deal with executor lost case. Based on above comment, seems it cannot. That will be a major concern for the use-case here. During the task scheduling, once an executor is lost, we may need the scheduler be able to re-schedule the task to a particular executor (e.g.  reused PVC in our case).
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845734433


   > So what if the state store resource is **required** not **optional**? It means, the task won't launch until getting the required state store. So in your PVC case, the task will wait until it re-mount to some executors. And if we make state store resource required, we should do the similar thing for the HDFS state store on executor lost. For example, we should reconstruct the state store on other active executors (or even we don't have to reconstruct the state store in reality but move the `StateStoreProviderId`s to other active executors' metadata (e.g., ExecutorData) should be enogh) so that the state store resources always exist and scheduling won't hang.
   
   No. In our use-case, we want to get rid of HDFS for state store checkpoint. So the task will wait until the PVC re-mounts to another new executor. Our state store is checkpointed to PVC, not HDFS.
   
   That is why I question about if stage level scheduling can handle such case. Because it is one of requirements of this proposed plugin API.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819097991


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41887/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-821191952


   > IIUC, the plugin does not affect or change how the scheduler acts on barrier tasks. In the current dequeue logic, the scheduler doesn't have different behavior on barrier task/general task. For now if the scheduler cannot schedule all barrier tasks at once, it will reset the assigned resource offers. It is the same with the plugin.
   
   I didn't see an API for this?  informScheduledTask is called at scheduling so my assumption is the plugin may be keeping state about where things have went and if things reset I would have expected an API call to let the plugin know. in TaskSchedulerImpl.resourceOffers it may have assigned some tasks but then if it doesn't get all needed for barrier then it resets them.  Maybe your intention is that it doesn't keep state?  The intention is that api would just throw?
   
   > That is correct. However, even for not first micro-batch, we currently use preferred location + non-trivial locality config (e.g., 10h) to force Spark schedule tasks to previous locations. I think it is not flexible because locality is a global setting. A non-trivial locality config might cause sub-optimal result for other stages
   
   so I don't completely understand this.  Are you just saying that the locality is not specific enough? I get the first micro-batch case kind of especially perhaps in the dynamic allocation type case - is that the case here, seems like you kind of hint at it above in a comment,  but don't understand in other cases.  Have you tried the newer locality algorithm vs the old one? 
   
   Does this come down to you really just want scheduler to force evenly distributed and then after that locality should work? It seems like you are saying it needs more then that though and locality isn't enough, would like to understand why.
   
   > non-trivial locality config (e.g., 10h)
   
   I'm not sure what that means?  do you just mean it has more logic in figuring out the locality?
   
   Overall I'm fine with having some sort of a plugin to allow people to experiment but I also want it generic enough to cover the cases I mentioned and for it to not cause problems where people can shoot themselves and then complain as to why things aren't working.  It would be nice to really understand this case to see if that is needed or if just something else can be improved for all people benefit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r633701474



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       As I replied on the doc too, locality is static. During the assignment of task set, locality is static. I also considered this option, e.g. dynamically change locality during task scheduling, but I think the change will be more, and it's more easily to affect current code/app.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       > If it is not to be exposed, then why make it a plugin? Isn't the risk that you are now overgeneralizing something for a non-existent benefit?
   
   I agree now the class name might be confusing. I thought it as a plugin. Now it is not a plugin you can plug into Spark but a private API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824394328


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-823944692


   **[Test build #137729 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137729/testReport)** for PR 32136 at commit [`29c0b6b`](https://github.com/apache/spark/commit/29c0b6b6495fa4366129f0a086d72deed8a985ca).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-857475592


   @tgravescs  what's your opinion on the `StateStoreTaskLocation` proposal?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826631839


   Shall we have a chance to look into the case you have resolved and how you do that? It would be easier for you to persuade others via showing exactly what is happening and how this will help. Providing actual implementation would be one of, and if you like to do then I'll wait for that instead before looking into this. During investigation on real world problem, we might be able to find alternatives and compare them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826124648


   **[Test build #137892 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137892/testReport)** for PR 32136 at commit [`173bb07`](https://github.com/apache/spark/commit/173bb07a9251141f7081ed8cd60c9adae3c566bd).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-826124648






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-823978363


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42256/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-846329353


   I'm not sure about the scenario of leveraging PVC as checkpoint location - at least that sounds to me as beyond the support of checkpoint in Structured Streaming.
   
   We have been clearly describing about the requirement of checkpoint location in Structured Streaming guide page, like following:
   
   > Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
   
   I know we allow custom checkpoint manager implementations to deal with non-HDFS compatible file system (like object stores which don't provide "atomic rename"), but they still deal with "remote" "fault-tolerant" file system, and doesn't require Spark scheduler to schedule specific task to specific executor based on the availability of checkpoint.
   
   In other words, only checkpoint manager handles the complexity of checkpoint on file system, not somewhere else. And sounds like it's no longer holding true if we want to support PVC based checkpoint. Please correct me if I'm missing something.
   
   I'm more likely novice on cloud/k8s, but from the common sense, I guess the actual storage of PVC should be still a sort of network storage to be resilient on "node down". I'm wondering how much benefits PVC approach gives compared to the existing approach as just directly use remote fault-tolerant file system. The benefits should be clear to cope with additional complexity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #32136:
URL: https://github.com/apache/spark/pull/32136#discussion_r633701474



##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulingPlugin.scala
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * This trait provides a plugin interface for suggesting task scheduling to Spark
+ * scheduler.
+ */
+private[spark] trait TaskSchedulingPlugin {

Review comment:
       As I replied on the doc too, locality is static. During the assignment of task set, locality is static. I also considered this option, e.g. dynamically change locality during task scheduling, but I think the change will be more, and it's more easily to affect current code/app.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-818723560


   > to avoid Spark schedule streaming tasks which use state store (let me call them stateful tasks) to arbitrary executors.
   
   I don't think we can guarantee it. It's a best effort and tasks should be able to run on any executor, thought tasks can have preferred executors (locality). Otherwise, we need to revisit many design decisions like how to avoid infinite wait, how to auto-scale, etc.
   
   > current locality seems a hacky approach as we can just blindly assign stateful tasks to executors evenly.
   
   Can you elaborate? If it's a problem of delay scheduling let's fix it instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-824401633


   **[Test build #137751 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137751/testReport)** for PR 32136 at commit [`d2a2b46`](https://github.com/apache/spark/commit/d2a2b46c5b3b68a30cddc43e9246e610c52e90f3).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-845745942


   > 
   <div class="js-discussion js-socket-channel ml-0 pl-0 ml-md-6 pl-md-3" data-channel="eyJjIjoibWFya2VkLWFzLXJlYWQ6MTYzOTcxNzQiLCJ0IjoxNjIxNTgzNTQzfQ==--6fa852c012afa82000dfc33c77461164153e5dd7f69a98ee63d2c88c9b6e742d" data-channel-target="MDExOlB1bGxSZXF1ZXN0NjEzOTgwMTM3" style="box-sizing: border-box; margin-left: 40px !important; padding-left: 16px !important; color: rgb(36, 41, 46); font-family: -apple-system, system-ui, &quot;Segoe UI&quot;, Helvetica, Arial, sans-serif, &quot;Apple Color Emoji&quot;, &quot;Segoe UI Emoji&quot;; font-size: 14px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;"><div class="js-timeline-it
 em js-timeline-progressive-focus-container" data-gid="MDEyOklzc3VlQ29tbWVudDg0NTc0NTM0Nw==" style="box-sizing: border-box;"><div class="TimelineItem js-comment-container" data-gid="MDEyOklzc3VlQ29tbWVudDg0NTc0NTM0Nw==" data-url="/_render_node/MDEyOklzc3VlQ29tbWVudDg0NTc0NTM0Nw==/timeline/issue_comment?variables%5BdeferredCommentActions%5D=false" style="box-sizing: border-box; position: relative; display: flex; padding: 16px 0px; margin-left: 16px;"><div class=" timeline-comment-group js-minimizable-comment-group js-targetable-element TimelineItem-body my-0 " id="issuecomment-845745347" style="box-sizing: border-box; margin-top: 4px; margin-bottom: 0px !important; min-width: 0px; max-width: 100%; color: var(--color-timeline-text); flex: 1 1 auto;"><div class="ml-n3 timeline-comment unminimized-comment comment previewable-edit js-task-list-container editable-comment js-comment timeline-comment--caret reorderable-task-lists" data-body-version="6f5dfd553ccc8e0ba2747426cf11be7b7694192baf
 d595ad23ef8f8bd48fa182" data-unfurl-hide-url="/content_reference_attachments/hide" style="box-sizing: border-box; margin-left: -16px !important; position: relative; color: var(--color-text-primary); background-color: var(--color-bg-primary); border: 1px solid var(--color-border-primary); border-radius: 6px;"><div class="edit-comment-hide" style="box-sizing: border-box;"><task-lists sortable="" style="box-sizing: border-box;">Oh, I mean we don't use HDFS for state store reconstruction.</task-lists></div><form class="js-comment-update" id="issuecomment-845745347-edit-form" action="https://github.com/apache/spark/issue_comments/845745347" accept-charset="UTF-8" method="post" style="box-sizing: border-box;"></form></div></div></div></div><div id="partial-timeline" class="js-timeline-marker js-socket-channel js-updatable-content" data-channel="eyJjIjoicHVsbF9yZXF1ZXN0OjYxMzk4MDEzNyIsInQiOjE2MjE1ODQwOTV9--b2412dd2c1813a96fb304082ebf76be20b861af1aedf24ca35711483af330106" data-url="/_render
 _node/MDExOlB1bGxSZXF1ZXN0NjEzOTgwMTM3/pull_requests/unread_timeline?variables%5BdeferCommitBadges%5D=false&amp;variables%5BdeferStatusCheckRollups%5D=false&amp;variables%5BdeferredCommentActions%5D=true&amp;variables%5BhasFocusedReviewComment%5D=false&amp;variables%5BhasFocusedReviewThread%5D=false&amp;variables%5BtimelinePageSize%5D=30&amp;variables%5BtimelineSince%5D=2021-05-21T08%3A01%3A32Z" data-last-modified="Fri, 21 May 2021 08:01:32 GMT" data-gid="MDExOlB1bGxSZXF1ZXN0NjEzOTgwMTM3" style="box-sizing: border-box;"></div></div><div class="discussion-timeline-actions" style="box-sizing: border-box; background-color: var(--color-bg-canvas); border-top: 2px solid var(--color-border-primary); color: rgb(36, 41, 46); font-family: -apple-system, system-ui, &quot;Segoe UI&quot;, Helvetica, Arial, sans-serif, &quot;Apple Color Emoji&quot;, &quot;Segoe UI Emoji&quot;; font-size: 14px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter
 -spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial;"><div id="partial-pull-merging" class="pull-merging js-pull-merging js-socket-channel js-updatable-content js-pull-refresh-on-pjax " aria-live="polite" data-channel="eyJjIjoicmVwbzoyMTQ2NzkwNzpicmFuY2g6U1BBUkstMzUwMjIiLCJ0IjoxNjIxNTgzOTc3fQ==--789c530f63163c0b7c8bc5c867b67b0ba09b81777ee86f4aa1c91223d834f15f eyJjIjoicmVwbzoxNzE2NTY1ODpicmFuY2g6bWFzdGVyIiwidCI6MTYyMTU4Mzk3N30=--960c892b2590716eefd2894a57a18d355a1af0508ab60c849ac8bcab8d3e3a24 eyJjIjoicmVwbzoxNzE2NTY1ODpjb21taXQ6MTczYmIwN2E5MjUxMTQxZjcwODFlZDhjZDYwYzlhZGFlM2M1NjZiZCIsInQiOjE2MjE1ODM5Nzd9--390e3c952019b8b994ea9f8531e1e98b3dbc17774140e03d8aa11c62185fbef0 eyJjIjoiaXNzdWU6ODU2Mzk2MTUyOnN0YXRlIiwidCI6MTYyMTU4Mzk3N30=--17028907c4ca30ac7fd6132dfeed15fcb875832b
 e075d6c832735b9f5952674f eyJjIjoicHVsbF9yZXF1ZXN0OjYxMzk4MDEzNzpyZXZpZXdfc3RhdGUiLCJ0IjoxNjIxNTgzOTc3fQ==--773202746fb365ab1aa5e7e6623e83484427b47747296dc8ce0ab5717e19167f eyJjIjoicHVsbF9yZXF1ZXN0OjYxMzk4MDEzNzp3b3JrZmxvd19ydW4iLCJ0IjoxNjIxNTgzOTc3fQ==--1b8fb2ef491cf98d46b2e670f2969ffe82581862c8639301693d28413a96d305" data-url="/apache/spark/pull/32136/show_partial?merge_type=squash&amp;partial=pull_requests%2Fmerging" style="box-sizing: border-box;"><br class="Apple-interchange-newline"></div></div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-819341329


   Correct me if I'm wrong: Spark tries its best to schedule SS tasks on executors that have existing state store data. This is already the case and is implemented via the preferred location. The problem we are solving here is the first micro-batch, where there is no existing state store data and we want to schedule the tasks of the first micro-batch evenly on the cluster. This is to avoid skews in the future that many SS tasks are running on very few executors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32136: [WIP][SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-820986926


   **[Test build #137473 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137473/testReport)** for PR 32136 at commit [`1178309`](https://github.com/apache/spark/commit/11783096183f7338f496b299b27935acc35e5097).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-833680616


   > I don't understand this. if the executor is lost isn't your state store lost? if its just on the host separate from executor, then host locality should still apply. Or are you referring to some new checkpoint mechanism where after a failure you would want the locality to change to where that checkpoint is?
   
   For example, we let state store checkpoint to a persistent volume on the executor, instead of HDFS. Once the executor is lost, we are able to mount the pv back on a new executor or another existing executor. Now, we don't want the state store to be assigned to any executor, but the executor with the pv with checkpointed state.
   
   BTW, how locality deals with executor lost? If I understand correctly, doesn't a long locality (e.g. 10h) mean Spark will hold the task waiting for the lost executor until locality wait finishes?
   
   > I'm definitely not against improving it or scheduling in general, and I never said that. I'm hesitant about the current proposal and implementation. I want clear goals and use cases it applies to, in order to make sure its implemented and solved in the proper way. I think my biggest complaint here is there is no complete overview or details, this kind of has piece meal explanations that when I look at seem to have holes in it, so I don't have a complete picture. I'm not sure this is the best place to plugin at if you really want flexibility.
   
   Sorry if I didn't state clearly the goals and our use-cases, although I thought I already explained above during the discussion. Let me reclaim it:
   
   We need the ability to have tasks (basically stateful tasks matter to us) scheduled on the executors we want. In other words, we need the ability to control where a task is scheduled to. Our goal is to improve the SS state store scheduling and checkpoint. Currently users can only rely on locality. However, locality has first-batch issue (explained in previous comment). Also, relying on user-facing config to solve platform problem seems a flaky approach. Not every user knows to set it.
   
   So it is clear that our use-case is for SS jobs with state store. One use-case of checkpoint with pv is stated at the beginning.
   
   The use-case is specific to SS. But, as Spark doesn't have scheduling plugin/API, we cannot limit the change only to SS module.
   
   > For instance I asked about this plugging in after locality and you said "If users don't want locality to take effect, it is doable by disabling locality configs". But this disables it for the entire job. If you just want spread on the first stage for instance and then everything after that to have locality, that doesn't work.
   
   Let's revisit the discussion:
   
   You question:
   > 1. what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case?   if we create a plugin to choose location I wouldn't necessarily want locality to take affect.
   
   Because you said if we create a plugin, you wouldn't want locality to take affect. So my answer is, you can disable the locality, if you doesn't want locality to take affect.
   
   I hope that can clarify it. Let me know if I misunderstand your questions.
   
   > so I know there is a doc associated with this, but I think it's not complete enough. I think it should go into more detail about specific problem, why locality isn't sufficient (or where locality isn't sufficient (first stage)), how those things interact, what all use cases this applies to, what use cases it doesn't solve or deficiencies in doing it this way. how does this really flow with SS in the different cases where executor lost and state store, explain it for someone who might not be familiar with it. @mridulm brought up a bunch of issues above in his comment as well.
   
   Okay. Let me enrich the doc with more details. Thanks for the suggestions.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-833845738


   Note: I updated the doc with more details on goals, non-goals, use-case, deficiencies of current solution (locality), etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #32136: [SPARK-35022][CORE] Task Scheduling Plugin in Spark

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #32136:
URL: https://github.com/apache/spark/pull/32136#issuecomment-858400265


   > Your rephrase looks good except for one point here. "task (i.e. state store)"? You mean task is kind of a type of state store? is it a typo? I actually expect that it's a mapping between PVC and task Id.
   
   A specific statestore is bound to a task, e.g. task 0 is bound with state store 0. This cannot be changed. But where to schedule the task, could be changed generally. This is current situation for HDFS-backed statestore. In other word, task-statestore is moved together if Spark schedule the task to different executor.
   
   So actually the mapping between PVC and task id, also means a (implicit) mapping between PVC and statestore of the task. That is why I add "i.e." there. Sorry for if any confusing.
   
   > I don't understand this. I assume each statestore must be bound to a specific location. Why we can't schedule the task?
   
   For current HDFS-backed statestore, the location can be changed. It is when Spark schedules the task with the statestore to new executor.
   
   Once it is changed to different executor, Spark will reload from checkpointed data from HDFS to construct the state store in new executor (location).
   
   For resource-specific case (e.g. PVC), the location is fixed generally, because it is bound to specific resource on the executor. But in case like executor lost and the resource is re-mountable. Spark can schedule the task with the statestore to new executor re-mounted with the resource.
   
   > `ResourceLocation` sounds too general. Mabye, `RequiredResourceLocation`?
   
   `RequiredResourceLocation` sounds good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org