You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/08/18 08:07:24 UTC

[GitHub] [incubator-doris] xy720 opened a new pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

xy720 opened a new pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383


   ## Proposed changes
   
   #4346 #4203 
   This cl will use yarn command as follows to kill or get status of application running on YARN.
   
   ```
   yarn --config confdir application <-kill | -status> <Application ID>
   ```
   
   To do
   1、 Make yarn command executable in spark load.
   2、Write spark resource into config files and update it before running command.
   3、Parse the result of executing the command line.
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [x] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [x] I have create an issue on (Fix #ISSUE), and have described the bug/feature there in detail
   - [] Compiling and unit tests pass locally with my changes
   - [] I have added tests that prove my fix is effective or that my feature works
   - [] If this change need a document change, I have updated the document
   - [] Any dependent changes have been merged
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477165212



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {
+    private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitors.class);
+    // 5min
+    private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;
+
+    private LogMonitor logMonitor;
+
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
+        return new LogMonitor(handle);
+    }
+
+    private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
+        switch (yarnState) {
+            case SUBMITTED:
+            case ACCEPTED:
+                return SparkLoadAppHandle.State.SUBMITTED;
+            case RUNNING:
+                return SparkLoadAppHandle.State.RUNNING;
+            case FINISHED:
+                return SparkLoadAppHandle.State.FINISHED;
+            case FAILED:
+                return SparkLoadAppHandle.State.FAILED;
+            case KILLED:
+                return SparkLoadAppHandle.State.KILLED;
+            default:
+                // NEW NEW_SAVING
+                return SparkLoadAppHandle.State.UNKNOWN;
+        }
+    }
+
+    public static class LogMonitor extends Thread {
+        private final Process process;
+        private SparkLoadAppHandle handle;
+        private long submitTimeoutMs;
+        private boolean isStop;
+
+        private static final String STATE = "state";
+        private static final String QUEUE = "queue";
+        private static final String START_TIME = "start time";
+        private static final String FINAL_STATUS = "final status";
+        private static final String URL = "tracking URL";
+        private static final String USER = "user";
+
+        public LogMonitor(SparkLoadAppHandle handle) {
+            this.handle = handle;
+            this.process = handle.getProcess();
+            this.isStop = false;
+        }
+
+        public void setSubmitTimeoutMs(long submitTimeoutMs) {
+            this.submitTimeoutMs = submitTimeoutMs;
+        }
+
+        // Monitor the process's output
+        @Override
+        public void run() {
+            BufferedReader outReader = null;
+            String line = null;
+            long startTime = System.currentTimeMillis();
+            try {
+                Preconditions.checkState(process.isAlive());
+                outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                while (!isStop && (line = outReader.readLine()) != null) {
+                    LOG.info("Monitor Log: " + line);
+                    // parse state and appId
+                    if (line.contains(STATE)) {
+                        SparkLoadAppHandle.State oldState = handle.getState();
+                        SparkLoadAppHandle.State newState = oldState;
+                        // 1. state
+                        String state = regexGetState(line);
+                        if (state != null) {
+                            YarnApplicationState yarnState = YarnApplicationState.valueOf(state);
+                            newState = fromYarnState(yarnState);
+                            if (newState != oldState) {
+                                handle.setState(newState);
+                            }
+                        }
+                        // 2. appId
+                        String appId = regexGetAppId(line);
+                        if (appId != null) {
+                            if (!appId.equals(handle.getAppId())) {
+                                handle.setAppId(appId);
+                            }
+                        }
+
+                        LOG.info("spark appId that handle get is {}, state: {}", handle.getAppId(), handle.getState().toString());
+                        switch (newState) {
+                            case UNKNOWN:
+                            case CONNECTED:
+                            case SUBMITTED:
+                                // If the app stays in the UNKNOWN/CONNECTED/SUBMITTED state for more than submitTimeoutMs
+                                // stop monitoring and kill the process
+                                if (System.currentTimeMillis() - startTime > submitTimeoutMs) {
+                                    isStop = true;
+                                    handle.kill();
+                                }
+                                break;
+                            case RUNNING:
+                            case FINISHED:
+                                // There's no need to parse all logs of handle process to get all the information.
+                                // As soon as the state changes to RUNNING/KILLED/FAILED/FINISHED/LOST,
+                                // stop monitoring but keep the process alive.
+                                isStop = true;
+                                break;
+                            case KILLED:
+                            case FAILED:
+                            case LOST:
+                                // If the state changes to KILLED/FAILED/LOST,
+                                // stop monitoring and kill the process
+                                isStop = true;
+                                handle.kill();
+                                break;
+                            default:
+                                Preconditions.checkState(false, "wrong spark app state");
+                        }
+                    }
+                    // parse other values
+                    else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINAL_STATUS) ||

Review comment:
       The state's changing follows the rule of 
   1. `submited > running > finished / failed` 
   2. `submitted > killed`
   3. `submitted > running > killed`
    Normally, the laucher will periodically print the `queue`、`start time`、`final status`、`tracking url`、`user` logs in state submitted/runnning. So in case 2, the else if block may still not be ran when the while loop be broken.
   But it's not very terrible that this else if block not be ran, beacuse the necessary value we need only contains `appId` and `state`. In this case, `queue`、`start time`、`final status`、`tracking url`、`user` is just missing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477152414



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
##########
@@ -262,19 +273,22 @@ public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJ
         return status;
     }
 
-    public void killEtlJob(SparkAppHandle handle, String appId, long loadJobId, SparkResource resource) {
+    public void killEtlJob(SparkAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException {
         if (resource.isYarnMaster()) {
             Preconditions.checkNotNull(appId);
-            YarnClient client = startYarnClient(resource);
-            try {
-                try {
-                    client.killApplication(ConverterUtils.toApplicationId(appId));
-                    LOG.info("yarn application -kill {}", appId);
-                } catch (YarnException | IOException e) {
-                    LOG.warn("yarn application kill failed. app id: {}, load job id: {}", appId, loadJobId, e);
-                }
-            } finally {
-                stopYarnClient(client);
+            // prepare yarn config
+            String configDir = resource.prepareYarnConfig();
+            // yarn client path
+            String yarnClient = Config.yarn_client_path;

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477165212



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {
+    private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitors.class);
+    // 5min
+    private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;
+
+    private LogMonitor logMonitor;
+
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
+        return new LogMonitor(handle);
+    }
+
+    private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
+        switch (yarnState) {
+            case SUBMITTED:
+            case ACCEPTED:
+                return SparkLoadAppHandle.State.SUBMITTED;
+            case RUNNING:
+                return SparkLoadAppHandle.State.RUNNING;
+            case FINISHED:
+                return SparkLoadAppHandle.State.FINISHED;
+            case FAILED:
+                return SparkLoadAppHandle.State.FAILED;
+            case KILLED:
+                return SparkLoadAppHandle.State.KILLED;
+            default:
+                // NEW NEW_SAVING
+                return SparkLoadAppHandle.State.UNKNOWN;
+        }
+    }
+
+    public static class LogMonitor extends Thread {
+        private final Process process;
+        private SparkLoadAppHandle handle;
+        private long submitTimeoutMs;
+        private boolean isStop;
+
+        private static final String STATE = "state";
+        private static final String QUEUE = "queue";
+        private static final String START_TIME = "start time";
+        private static final String FINAL_STATUS = "final status";
+        private static final String URL = "tracking URL";
+        private static final String USER = "user";
+
+        public LogMonitor(SparkLoadAppHandle handle) {
+            this.handle = handle;
+            this.process = handle.getProcess();
+            this.isStop = false;
+        }
+
+        public void setSubmitTimeoutMs(long submitTimeoutMs) {
+            this.submitTimeoutMs = submitTimeoutMs;
+        }
+
+        // Monitor the process's output
+        @Override
+        public void run() {
+            BufferedReader outReader = null;
+            String line = null;
+            long startTime = System.currentTimeMillis();
+            try {
+                Preconditions.checkState(process.isAlive());
+                outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                while (!isStop && (line = outReader.readLine()) != null) {
+                    LOG.info("Monitor Log: " + line);
+                    // parse state and appId
+                    if (line.contains(STATE)) {
+                        SparkLoadAppHandle.State oldState = handle.getState();
+                        SparkLoadAppHandle.State newState = oldState;
+                        // 1. state
+                        String state = regexGetState(line);
+                        if (state != null) {
+                            YarnApplicationState yarnState = YarnApplicationState.valueOf(state);
+                            newState = fromYarnState(yarnState);
+                            if (newState != oldState) {
+                                handle.setState(newState);
+                            }
+                        }
+                        // 2. appId
+                        String appId = regexGetAppId(line);
+                        if (appId != null) {
+                            if (!appId.equals(handle.getAppId())) {
+                                handle.setAppId(appId);
+                            }
+                        }
+
+                        LOG.info("spark appId that handle get is {}, state: {}", handle.getAppId(), handle.getState().toString());
+                        switch (newState) {
+                            case UNKNOWN:
+                            case CONNECTED:
+                            case SUBMITTED:
+                                // If the app stays in the UNKNOWN/CONNECTED/SUBMITTED state for more than submitTimeoutMs
+                                // stop monitoring and kill the process
+                                if (System.currentTimeMillis() - startTime > submitTimeoutMs) {
+                                    isStop = true;
+                                    handle.kill();
+                                }
+                                break;
+                            case RUNNING:
+                            case FINISHED:
+                                // There's no need to parse all logs of handle process to get all the information.
+                                // As soon as the state changes to RUNNING/KILLED/FAILED/FINISHED/LOST,
+                                // stop monitoring but keep the process alive.
+                                isStop = true;
+                                break;
+                            case KILLED:
+                            case FAILED:
+                            case LOST:
+                                // If the state changes to KILLED/FAILED/LOST,
+                                // stop monitoring and kill the process
+                                isStop = true;
+                                handle.kill();
+                                break;
+                            default:
+                                Preconditions.checkState(false, "wrong spark app state");
+                        }
+                    }
+                    // parse other values
+                    else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINAL_STATUS) ||

Review comment:
       The state's changing follows the rule of 
   1. `submited > running > finished / failed` 
   2. `submitted > killed`
   3. `submitted > running > killed`
    Normally, the laucher will print the `queue`、`start time`、`final status`、`tracking url`、`user` logs in state submitted/runnning. So in case 2, the else if block may still not be ran when the while loop be broken.
   But it's not very terrible that this else if block not be ran, beacuse the necessary value we need only contains `appId` and `state`. In this case, `queue`、`start time`、`final status`、`tracking url`、`user` is just missing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r476446542



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
##########
@@ -140,93 +136,96 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
                 .setAppName(String.format(ETL_JOB_NAME, loadLabel))
                 .setSparkHome(sparkHome)
                 .addAppArgs(jobConfigHdfsPath)
-                .redirectError()
-                .redirectOutput(new File(Config.sys_log_dir + "/spark-submitter.log"));
+                .redirectError();
+                //.redirectOutput(new File(Config.sys_log_dir + "/spark-submitter.log"));

Review comment:
       Remove unused code

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {
+    private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitors.class);
+    // 5min
+    private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;
+
+    private LogMonitor logMonitor;
+
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
+        return new LogMonitor(handle);
+    }
+
+    private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
+        switch (yarnState) {
+            case SUBMITTED:
+            case ACCEPTED:
+                return SparkLoadAppHandle.State.SUBMITTED;
+            case RUNNING:
+                return SparkLoadAppHandle.State.RUNNING;
+            case FINISHED:
+                return SparkLoadAppHandle.State.FINISHED;
+            case FAILED:
+                return SparkLoadAppHandle.State.FAILED;
+            case KILLED:
+                return SparkLoadAppHandle.State.KILLED;
+            default:
+                // NEW NEW_SAVING
+                return SparkLoadAppHandle.State.UNKNOWN;
+        }
+    }
+
+    public static class LogMonitor extends Thread {
+        private final Process process;
+        private SparkLoadAppHandle handle;
+        private long submitTimeoutMs;
+        private boolean isStop;
+
+        private static final String STATE = "state";
+        private static final String QUEUE = "queue";
+        private static final String START_TIME = "start time";
+        private static final String FINAL_STATUS = "final status";
+        private static final String URL = "tracking URL";
+        private static final String USER = "user";
+
+        public LogMonitor(SparkLoadAppHandle handle) {
+            this.handle = handle;
+            this.process = handle.getProcess();
+            this.isStop = false;
+        }
+
+        public void setSubmitTimeoutMs(long submitTimeoutMs) {
+            this.submitTimeoutMs = submitTimeoutMs;
+        }
+
+        // Monitor the process's output
+        @Override
+        public void run() {
+            BufferedReader outReader = null;
+            String line = null;
+            long startTime = System.currentTimeMillis();
+            try {
+                Preconditions.checkState(process.isAlive());
+                outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                while (!isStop && (line = outReader.readLine()) != null) {
+                    LOG.info("Monitor Log: " + line);
+                    // parse state and appId
+                    if (line.contains(STATE)) {
+                        SparkLoadAppHandle.State oldState = handle.getState();
+                        SparkLoadAppHandle.State newState = oldState;
+                        // 1. state
+                        String state = regexGetState(line);
+                        if (state != null) {
+                            YarnApplicationState yarnState = YarnApplicationState.valueOf(state);
+                            newState = fromYarnState(yarnState);
+                            if (newState != oldState) {
+                                handle.setState(newState);
+                            }
+                        }
+                        // 2. appId
+                        String appId = regexGetAppId(line);
+                        if (appId != null) {
+                            if (!appId.equals(handle.getAppId())) {
+                                handle.setAppId(appId);
+                            }
+                        }
+
+                        LOG.info("spark appId that handle get is {}, state: {}", handle.getAppId(), handle.getState().toString());
+                        switch (newState) {
+                            case UNKNOWN:
+                            case CONNECTED:
+                            case SUBMITTED:
+                                // If the app stays in the UNKNOWN/CONNECTED/SUBMITTED state for more than submitTimeoutMs
+                                // stop monitoring and kill the process
+                                if (System.currentTimeMillis() - startTime > submitTimeoutMs) {
+                                    isStop = true;
+                                    handle.kill();
+                                }
+                                break;
+                            case RUNNING:
+                            case FINISHED:
+                                // There's no need to parse all logs of handle process to get all the information.
+                                // As soon as the state changes to RUNNING/KILLED/FAILED/FINISHED/LOST,
+                                // stop monitoring but keep the process alive.
+                                isStop = true;
+                                break;
+                            case KILLED:
+                            case FAILED:
+                            case LOST:
+                                // If the state changes to KILLED/FAILED/LOST,
+                                // stop monitoring and kill the process
+                                isStop = true;
+                                handle.kill();
+                                break;
+                            default:
+                                Preconditions.checkState(false, "wrong spark app state");
+                        }
+                    }
+                    // parse other values
+                    else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINAL_STATUS) ||

Review comment:
       if the line contains "STATE", the while loop may be broken. So how to guarantee that this `else if` block can be ran?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {

Review comment:
       Add comment to explain the function of this class

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {
+    private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitors.class);
+    // 5min
+    private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;
+
+    private LogMonitor logMonitor;
+
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
+        return new LogMonitor(handle);
+    }
+
+    private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
+        switch (yarnState) {
+            case SUBMITTED:
+            case ACCEPTED:
+                return SparkLoadAppHandle.State.SUBMITTED;
+            case RUNNING:
+                return SparkLoadAppHandle.State.RUNNING;
+            case FINISHED:
+                return SparkLoadAppHandle.State.FINISHED;
+            case FAILED:
+                return SparkLoadAppHandle.State.FAILED;
+            case KILLED:
+                return SparkLoadAppHandle.State.KILLED;
+            default:
+                // NEW NEW_SAVING
+                return SparkLoadAppHandle.State.UNKNOWN;
+        }
+    }
+
+    public static class LogMonitor extends Thread {
+        private final Process process;
+        private SparkLoadAppHandle handle;
+        private long submitTimeoutMs;
+        private boolean isStop;
+
+        private static final String STATE = "state";
+        private static final String QUEUE = "queue";
+        private static final String START_TIME = "start time";
+        private static final String FINAL_STATUS = "final status";
+        private static final String URL = "tracking URL";
+        private static final String USER = "user";
+
+        public LogMonitor(SparkLoadAppHandle handle) {
+            this.handle = handle;
+            this.process = handle.getProcess();
+            this.isStop = false;
+        }
+
+        public void setSubmitTimeoutMs(long submitTimeoutMs) {
+            this.submitTimeoutMs = submitTimeoutMs;
+        }
+
+        // Monitor the process's output
+        @Override
+        public void run() {
+            BufferedReader outReader = null;
+            String line = null;
+            long startTime = System.currentTimeMillis();
+            try {
+                Preconditions.checkState(process.isAlive());

Review comment:
       How to make sure the process is still alive here?

##########
File path: fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
##########
@@ -95,94 +132,103 @@ public void setUp() {
                 .SparkLibrary("", "", SparkRepository.SparkLibrary.LibType.SPARK2X, 0L));
     }
 
-    @Test
-    public void testSubmitEtlJob(@Mocked BrokerUtil brokerUtil, @Mocked SparkLauncher launcher,
-                                 @Injectable SparkAppHandle handle) throws IOException, LoadException {
-        new Expectations() {
-            {
-                launcher.startApplication((SparkAppHandle.Listener) any);
-                result = handle;
-                handle.getAppId();
-                returns(null, null, appId);
-                handle.getState();
-                returns(State.CONNECTED, State.SUBMITTED, State.RUNNING);
-            }
-        };
-
-        EtlJobConfig etlJobConfig = new EtlJobConfig(Maps.newHashMap(), etlOutputPath, label, null);
-        SparkResource resource = new SparkResource(resourceName);
-        new Expectations(resource) {
-            {
-                resource.prepareArchive();
-                result = archive;
-            }
-        };
+//    @Test

Review comment:
       Remove unused code

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {
+    private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitors.class);
+    // 5min
+    private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;
+
+    private LogMonitor logMonitor;
+
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
+        return new LogMonitor(handle);
+    }
+
+    private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
+        switch (yarnState) {
+            case SUBMITTED:
+            case ACCEPTED:
+                return SparkLoadAppHandle.State.SUBMITTED;
+            case RUNNING:
+                return SparkLoadAppHandle.State.RUNNING;
+            case FINISHED:
+                return SparkLoadAppHandle.State.FINISHED;
+            case FAILED:
+                return SparkLoadAppHandle.State.FAILED;
+            case KILLED:
+                return SparkLoadAppHandle.State.KILLED;
+            default:
+                // NEW NEW_SAVING
+                return SparkLoadAppHandle.State.UNKNOWN;
+        }
+    }
+
+    public static class LogMonitor extends Thread {
+        private final Process process;
+        private SparkLoadAppHandle handle;
+        private long submitTimeoutMs;
+        private boolean isStop;
+
+        private static final String STATE = "state";
+        private static final String QUEUE = "queue";
+        private static final String START_TIME = "start time";
+        private static final String FINAL_STATUS = "final status";
+        private static final String URL = "tracking URL";
+        private static final String USER = "user";
+
+        public LogMonitor(SparkLoadAppHandle handle) {
+            this.handle = handle;
+            this.process = handle.getProcess();
+            this.isStop = false;
+        }
+
+        public void setSubmitTimeoutMs(long submitTimeoutMs) {
+            this.submitTimeoutMs = submitTimeoutMs;
+        }
+
+        // Monitor the process's output
+        @Override
+        public void run() {
+            BufferedReader outReader = null;
+            String line = null;
+            long startTime = System.currentTimeMillis();
+            try {
+                Preconditions.checkState(process.isAlive());
+                outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                while (!isStop && (line = outReader.readLine()) != null) {
+                    LOG.info("Monitor Log: " + line);
+                    // parse state and appId
+                    if (line.contains(STATE)) {

Review comment:
       You can add an example output line here, so that the reviewer can know what the line looks like.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477199868



##########
File path: fe/fe-core/src/main/java/org/apache/doris/common/Config.java
##########
@@ -543,6 +543,14 @@
     @ConfField
     public static String spark_resource_path = "";
 
+    /**
+     * Default yarn client path
+     */
+    public static String yarn_client_path = PaloFe.DORIS_HOME_DIR + "/lib/yarn-client/hadoop/bin/yarn";

Review comment:
       @ConfField

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
##########
@@ -74,17 +67,21 @@
     private static final String JOB_CONFIG_DIR = "configs";
     private static final String ETL_JOB_NAME = "doris__%s";
     // 5min
-    private static final int GET_APPID_MAX_RETRY_TIMES = 300;
-    private static final int GET_APPID_SLEEP_MS = 1000;
-
-    class SparkAppListener implements Listener {
+    private static final long GET_APPID_TIMEOUT_MS = 300000L;
+    // 30s
+    private static final long EXEC_CMD_TIMEOUT_MS = 30000L;
+    // yarn command
+    private static final String YARN_STATUS_CMD = "%s --config %s application -status %s";
+    private static final String YARN_KILL_CMD = "%s --config %s application -kill %s";
+
+    class SparkAppListener implements SparkLoadAppHandle.Listener {

Review comment:
       Not used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477185928



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {

Review comment:
       How about SparkLauncherMonitor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477182446



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
##########
@@ -140,93 +137,98 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
                 .setAppName(String.format(ETL_JOB_NAME, loadLabel))
                 .setSparkHome(sparkHome)
                 .addAppArgs(jobConfigHdfsPath)
-                .redirectError()
-                .redirectOutput(new File(Config.sys_log_dir + "/spark-submitter.log"));
+                .redirectError();
 
         // spark configs
         for (Map.Entry<String, String> entry : resource.getSparkConfigs().entrySet()) {
             launcher.setConf(entry.getKey(), entry.getValue());
         }
 
         // start app
-        SparkAppHandle handle = null;
+        SparkLoadAppHandle handle = null;
         State state = null;
         String appId = null;
-        int retry = 0;
         String errMsg = "start spark app failed. error: ";
         try {
-            handle = launcher.startApplication(new SparkAppListener());
+            Process process = launcher.launch();
+            handle = new SparkLoadAppHandle(process);
+            handle.addListener(new SparkAppListener());
+            if (!FeConstants.runningUnitTest) {
+                SparkLauncherMonitors.LogMonitor logMonitor = SparkLauncherMonitors.createLogMonitor(handle);
+                logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
+                logMonitor.start();
+                try {
+                    logMonitor.join();
+                } catch (InterruptedException e) {
+                    logMonitor.interrupt();
+                    throw new LoadException(errMsg + e.getMessage());
+                }
+            }
+            appId = handle.getAppId();
+            state = handle.getState();
         } catch (IOException e) {
             LOG.warn(errMsg, e);
             throw new LoadException(errMsg + e.getMessage());
         }
 
-        while (retry++ < GET_APPID_MAX_RETRY_TIMES) {
-            appId = handle.getAppId();
-            if (appId != null) {
-                break;
-            }
-
-            // check state and retry
-            state = handle.getState();
-            if (fromSparkState(state) == TEtlState.CANCELLED) {
-                throw new LoadException(errMsg + "spark app state: " + state.toString());
-            }
-            if (retry >= GET_APPID_MAX_RETRY_TIMES) {
-                throw new LoadException(errMsg + "wait too much time for getting appid. spark app state: "
-                                                + state.toString());
-            }
+        if (fromSparkState(state) == TEtlState.CANCELLED) {
+            throw new LoadException(errMsg + "spark app state: " + state.toString() + ", loadJobId:" + loadJobId);
+        }
 
-            // log
-            if (retry % 10 == 0) {
-                LOG.info("spark appid that handle get is null. load job id: {}, state: {}, retry times: {}",
-                         loadJobId, state.toString(), retry);
-            }
-            try {
-                Thread.sleep(GET_APPID_SLEEP_MS);
-            } catch (InterruptedException e) {
-                LOG.warn(e.getMessage());
-            }
+        if (appId == null) {
+            throw new LoadException(errMsg + "Failed to get appId from handle. spark app state: "
+                    + state.toString() + ", loadJobId:" + loadJobId);
         }
 
         // success
         attachment.setAppId(appId);
         attachment.setHandle(handle);
     }
 
-    public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJobId, String etlOutputPath,
-                                     SparkResource resource, BrokerDesc brokerDesc) {
+    public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long loadJobId, String etlOutputPath,
+                                     SparkResource resource, BrokerDesc brokerDesc) throws LoadException {
         EtlStatus status = new EtlStatus();
 
         if (resource.isYarnMaster()) {
-            // state from yarn
             Preconditions.checkState(appId != null && !appId.isEmpty());

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477152907



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {
+    private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitors.class);
+    // 5min
+    private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;
+
+    private LogMonitor logMonitor;
+
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
+        return new LogMonitor(handle);
+    }
+
+    private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
+        switch (yarnState) {
+            case SUBMITTED:
+            case ACCEPTED:
+                return SparkLoadAppHandle.State.SUBMITTED;
+            case RUNNING:
+                return SparkLoadAppHandle.State.RUNNING;
+            case FINISHED:
+                return SparkLoadAppHandle.State.FINISHED;
+            case FAILED:
+                return SparkLoadAppHandle.State.FAILED;
+            case KILLED:
+                return SparkLoadAppHandle.State.KILLED;
+            default:
+                // NEW NEW_SAVING
+                return SparkLoadAppHandle.State.UNKNOWN;
+        }
+    }
+
+    public static class LogMonitor extends Thread {
+        private final Process process;
+        private SparkLoadAppHandle handle;
+        private long submitTimeoutMs;
+        private boolean isStop;
+
+        private static final String STATE = "state";
+        private static final String QUEUE = "queue";
+        private static final String START_TIME = "start time";
+        private static final String FINAL_STATUS = "final status";
+        private static final String URL = "tracking URL";
+        private static final String USER = "user";
+
+        public LogMonitor(SparkLoadAppHandle handle) {
+            this.handle = handle;
+            this.process = handle.getProcess();
+            this.isStop = false;
+        }
+
+        public void setSubmitTimeoutMs(long submitTimeoutMs) {
+            this.submitTimeoutMs = submitTimeoutMs;
+        }
+
+        // Monitor the process's output
+        @Override
+        public void run() {
+            BufferedReader outReader = null;
+            String line = null;
+            long startTime = System.currentTimeMillis();
+            try {
+                Preconditions.checkState(process.isAlive());

Review comment:
       No need to make sure the process is alive. We can get output even if the process is not alive.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477153148



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {
+    private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitors.class);
+    // 5min
+    private static final long SUBMIT_APP_TIMEOUT_MS = 300 * 1000;
+
+    private LogMonitor logMonitor;
+
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
+        return new LogMonitor(handle);
+    }
+
+    private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
+        switch (yarnState) {
+            case SUBMITTED:
+            case ACCEPTED:
+                return SparkLoadAppHandle.State.SUBMITTED;
+            case RUNNING:
+                return SparkLoadAppHandle.State.RUNNING;
+            case FINISHED:
+                return SparkLoadAppHandle.State.FINISHED;
+            case FAILED:
+                return SparkLoadAppHandle.State.FAILED;
+            case KILLED:
+                return SparkLoadAppHandle.State.KILLED;
+            default:
+                // NEW NEW_SAVING
+                return SparkLoadAppHandle.State.UNKNOWN;
+        }
+    }
+
+    public static class LogMonitor extends Thread {
+        private final Process process;
+        private SparkLoadAppHandle handle;
+        private long submitTimeoutMs;
+        private boolean isStop;
+
+        private static final String STATE = "state";
+        private static final String QUEUE = "queue";
+        private static final String START_TIME = "start time";
+        private static final String FINAL_STATUS = "final status";
+        private static final String URL = "tracking URL";
+        private static final String USER = "user";
+
+        public LogMonitor(SparkLoadAppHandle handle) {
+            this.handle = handle;
+            this.process = handle.getProcess();
+            this.isStop = false;
+        }
+
+        public void setSubmitTimeoutMs(long submitTimeoutMs) {
+            this.submitTimeoutMs = submitTimeoutMs;
+        }
+
+        // Monitor the process's output
+        @Override
+        public void run() {
+            BufferedReader outReader = null;
+            String line = null;
+            long startTime = System.currentTimeMillis();
+            try {
+                Preconditions.checkState(process.isAlive());
+                outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                while (!isStop && (line = outReader.readLine()) != null) {
+                    LOG.info("Monitor Log: " + line);
+                    // parse state and appId
+                    if (line.contains(STATE)) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r472836715



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/YarnApplicationReport.java
##########
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.LoadException;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Covert output string of command `yarn application -status` to application report.
+ * Input sample:
+ * -------------------
+ * Application Report :
+ * 	Application-Id : application_1573630236805_6763648
+ * 	Application-Name : doris_label_test
+ * 	Application-Type : SPARK-2.4.1
+ * 	User : test
+ * 	Queue : test-queue
+ * 	Start-Time : 1597654469958
+ * 	Finish-Time : 1597654801939
+ * 	Progress : 100%
+ * 	State : FINISHED
+ * 	Final-State : SUCCEEDED
+ * 	Tracking-URL : 127.0.0.1:8004/history/application_1573630236805_6763648/1
+ * 	RPC Port : 40236
+ * 	AM Host : host-name
+ * 	------------------
+ *
+ * 	Output:
+ * 	ApplicationReport
+ */
+public class YarnApplicationReport {
+    private static final String APPLICATION_ID = "Application-Id";
+    private static final String APPLICATION_TYPE = "Application-Type";
+    private static final String APPLICATION_NAME = "Application-Name";
+    private static final String USER = "User";
+    private static final String QUEUE = "Queue";
+    private static final String START_TIME = "Start-Time";
+    private static final String FINISH_TIME = "Finish-Time";
+    private static final String PROGRESS = "Progress";
+    private static final String STATE = "State";
+    private static final String FINAL_STATE = "Final-State";
+    private static final String TRACKING_URL = "Tracking-URL";
+    private static final String RPC_PORT = "RPC Port";
+    private static final String AM_HOST = "AM Host";
+    private static final String DIAGNOSTICS = "Diagnostics";
+
+    private ApplicationReport report;
+
+    public YarnApplicationReport(String output) throws LoadException {
+        this.report = new ApplicationReportPBImpl();
+        parseFromOutput(output);
+    }
+
+    public ApplicationReport getReport() {
+        return report;
+    }
+
+    private void parseFromOutput(String output) throws LoadException {
+        Map<String, String> reportMap = Maps.newHashMap();
+        List<String> lines = Splitter.onPattern("\n").trimResults().splitToList(output);
+        // Application-Id : application_1573630236805_6763648 ==> (Application-Id, application_1573630236805_6763648)
+        for (String line : lines) {
+            List<String> entry = Splitter.onPattern(":").limit(2).trimResults().splitToList(line);
+            Preconditions.checkState(entry.size() <= 2);

Review comment:
       changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wuyunfeng commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
wuyunfeng commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477209180



##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
##########
@@ -243,6 +264,16 @@ protected void setProperties(Map<String, String> properties) throws DdlException
         return sparkConfigs;

Review comment:
       ```suggestion
           return sparkConfig;
   ```
   The method name also need changed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r473516074



##########
File path: fe/fe-core/src/main/java/org/apache/doris/common/Config.java
##########
@@ -543,6 +543,15 @@
     @ConfField
     public static String spark_resource_path = "";
 
+    /**
+     * Default yarn client path
+     */
+    @ConfField(mutable = true, masterOnly = true)

Review comment:
       immutable

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
##########
@@ -262,19 +273,22 @@ public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJ
         return status;
     }
 
-    public void killEtlJob(SparkAppHandle handle, String appId, long loadJobId, SparkResource resource) {
+    public void killEtlJob(SparkAppHandle handle, String appId, long loadJobId, SparkResource resource) throws LoadException {
         if (resource.isYarnMaster()) {
             Preconditions.checkNotNull(appId);
-            YarnClient client = startYarnClient(resource);
-            try {
-                try {
-                    client.killApplication(ConverterUtils.toApplicationId(appId));
-                    LOG.info("yarn application -kill {}", appId);
-                } catch (YarnException | IOException e) {
-                    LOG.warn("yarn application kill failed. app id: {}, load job id: {}", appId, loadJobId, e);
-                }
-            } finally {
-                stopYarnClient(client);
+            // prepare yarn config
+            String configDir = resource.prepareYarnConfig();
+            // yarn client path
+            String yarnClient = Config.yarn_client_path;

Review comment:
       Better to create a function called `getYarnClienthPath()` and check if the binary file exist in that function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wuyunfeng commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
wuyunfeng commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477216354



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/YarnApplicationReport.java
##########
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.LoadException;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Covert output string of command `yarn application -status` to application report.
+ * Input sample:
+ * -------------------
+ * Application Report :
+ * 	Application-Id : application_1573630236805_6763648
+ * 	Application-Name : doris_label_test
+ * 	Application-Type : SPARK-2.4.1
+ * 	User : test
+ * 	Queue : test-queue
+ * 	Start-Time : 1597654469958
+ * 	Finish-Time : 1597654801939
+ * 	Progress : 100%
+ * 	State : FINISHED
+ * 	Final-State : SUCCEEDED
+ * 	Tracking-URL : 127.0.0.1:8004/history/application_1573630236805_6763648/1
+ * 	RPC Port : 40236
+ * 	AM Host : host-name
+ * 	------------------
+ *
+ * 	Output:
+ * 	ApplicationReport
+ */
+public class YarnApplicationReport {
+    private static final String APPLICATION_ID = "Application-Id";
+    private static final String APPLICATION_TYPE = "Application-Type";
+    private static final String APPLICATION_NAME = "Application-Name";
+    private static final String USER = "User";
+    private static final String QUEUE = "Queue";
+    private static final String START_TIME = "Start-Time";
+    private static final String FINISH_TIME = "Finish-Time";
+    private static final String PROGRESS = "Progress";
+    private static final String STATE = "State";
+    private static final String FINAL_STATE = "Final-State";
+    private static final String TRACKING_URL = "Tracking-URL";
+    private static final String RPC_PORT = "RPC Port";
+    private static final String AM_HOST = "AM Host";
+    private static final String DIAGNOSTICS = "Diagnostics";
+
+    private ApplicationReport report;
+
+    public YarnApplicationReport(String output) throws LoadException {
+        this.report = new ApplicationReportPBImpl();
+        parseFromOutput(output);
+    }
+
+    public ApplicationReport getReport() {
+        return report;
+    }
+
+    private void parseFromOutput(String output) throws LoadException {
+        Map<String, String> reportMap = Maps.newHashMap();
+        List<String> lines = Splitter.onPattern("\n").trimResults().splitToList(output);
+        // Application-Id : application_1573630236805_6763648 ==> (Application-Id, application_1573630236805_6763648)
+        for (String line : lines) {
+            List<String> entry = Splitter.onPattern(":").limit(2).trimResults().splitToList(line);
+            Preconditions.checkState(entry.size() <= 2, line);
+            if (entry.size() > 1) {
+                reportMap.put(entry.get(0), entry.get(1));
+            } else {
+                reportMap.put(entry.get(0), "");
+            }
+        }
+
+        try {
+            report.setApplicationId(ConverterUtils.toApplicationId(reportMap.get(APPLICATION_ID)));

Review comment:
       format seems to wrong?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#issuecomment-679967675


   @xy720 Hi, do we have any user doc for this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wuyunfeng commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
wuyunfeng commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477160493



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
##########
@@ -140,93 +137,98 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
                 .setAppName(String.format(ETL_JOB_NAME, loadLabel))
                 .setSparkHome(sparkHome)
                 .addAppArgs(jobConfigHdfsPath)
-                .redirectError()
-                .redirectOutput(new File(Config.sys_log_dir + "/spark-submitter.log"));
+                .redirectError();
 
         // spark configs
         for (Map.Entry<String, String> entry : resource.getSparkConfigs().entrySet()) {
             launcher.setConf(entry.getKey(), entry.getValue());
         }
 
         // start app
-        SparkAppHandle handle = null;
+        SparkLoadAppHandle handle = null;
         State state = null;
         String appId = null;
-        int retry = 0;
         String errMsg = "start spark app failed. error: ";
         try {
-            handle = launcher.startApplication(new SparkAppListener());
+            Process process = launcher.launch();
+            handle = new SparkLoadAppHandle(process);
+            handle.addListener(new SparkAppListener());
+            if (!FeConstants.runningUnitTest) {
+                SparkLauncherMonitors.LogMonitor logMonitor = SparkLauncherMonitors.createLogMonitor(handle);
+                logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
+                logMonitor.start();
+                try {
+                    logMonitor.join();
+                } catch (InterruptedException e) {
+                    logMonitor.interrupt();
+                    throw new LoadException(errMsg + e.getMessage());
+                }
+            }
+            appId = handle.getAppId();
+            state = handle.getState();
         } catch (IOException e) {
             LOG.warn(errMsg, e);
             throw new LoadException(errMsg + e.getMessage());
         }
 
-        while (retry++ < GET_APPID_MAX_RETRY_TIMES) {
-            appId = handle.getAppId();
-            if (appId != null) {
-                break;
-            }
-
-            // check state and retry
-            state = handle.getState();
-            if (fromSparkState(state) == TEtlState.CANCELLED) {
-                throw new LoadException(errMsg + "spark app state: " + state.toString());
-            }
-            if (retry >= GET_APPID_MAX_RETRY_TIMES) {
-                throw new LoadException(errMsg + "wait too much time for getting appid. spark app state: "
-                                                + state.toString());
-            }
+        if (fromSparkState(state) == TEtlState.CANCELLED) {
+            throw new LoadException(errMsg + "spark app state: " + state.toString() + ", loadJobId:" + loadJobId);
+        }
 
-            // log
-            if (retry % 10 == 0) {
-                LOG.info("spark appid that handle get is null. load job id: {}, state: {}, retry times: {}",
-                         loadJobId, state.toString(), retry);
-            }
-            try {
-                Thread.sleep(GET_APPID_SLEEP_MS);
-            } catch (InterruptedException e) {
-                LOG.warn(e.getMessage());
-            }
+        if (appId == null) {
+            throw new LoadException(errMsg + "Failed to get appId from handle. spark app state: "
+                    + state.toString() + ", loadJobId:" + loadJobId);
         }
 
         // success
         attachment.setAppId(appId);
         attachment.setHandle(handle);
     }
 
-    public EtlStatus getEtlJobStatus(SparkAppHandle handle, String appId, long loadJobId, String etlOutputPath,
-                                     SparkResource resource, BrokerDesc brokerDesc) {
+    public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long loadJobId, String etlOutputPath,
+                                     SparkResource resource, BrokerDesc brokerDesc) throws LoadException {
         EtlStatus status = new EtlStatus();
 
         if (resource.isYarnMaster()) {
-            // state from yarn
             Preconditions.checkState(appId != null && !appId.isEmpty());

Review comment:
       Maybe put statement `Preconditions.checkState ` outer the if statement?

##########
File path: fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
##########
@@ -243,6 +264,16 @@ protected void setProperties(Map<String, String> properties) throws DdlException
         return sparkConfigs;
     }
 
+    private Map<String, String> getSparkHadoopConfigs(Map<String, String> properties) {
+        Map<String, String> sparkConfigs = Maps.newHashMap();

Review comment:
       ```suggestion
           Map<String, String> sparkConfig = Maps.newHashMap();
   ```

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitors.java
##########
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SparkLauncherMonitors {

Review comment:
       I do not like the `xxxxs`

##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ConfigFile.java
##########
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.LoadException;
+
+public interface ConfigFile {

Review comment:
        Can you add some comment ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r472090745



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/YarnApplicationReport.java
##########
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.LoadException;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Covert output string of command `yarn application -status` to application report.
+ * Input sample:
+ * -------------------
+ * Application Report :
+ * 	Application-Id : application_1573630236805_6763648
+ * 	Application-Name : doris_label_test
+ * 	Application-Type : SPARK-2.4.1
+ * 	User : test
+ * 	Queue : test-queue
+ * 	Start-Time : 1597654469958
+ * 	Finish-Time : 1597654801939
+ * 	Progress : 100%
+ * 	State : FINISHED
+ * 	Final-State : SUCCEEDED
+ * 	Tracking-URL : 127.0.0.1:8004/history/application_1573630236805_6763648/1
+ * 	RPC Port : 40236
+ * 	AM Host : host-name
+ * 	------------------
+ *
+ * 	Output:
+ * 	ApplicationReport
+ */
+public class YarnApplicationReport {
+    private static final String APPLICATION_ID = "Application-Id";
+    private static final String APPLICATION_TYPE = "Application-Type";
+    private static final String APPLICATION_NAME = "Application-Name";
+    private static final String USER = "User";
+    private static final String QUEUE = "Queue";
+    private static final String START_TIME = "Start-Time";
+    private static final String FINISH_TIME = "Finish-Time";
+    private static final String PROGRESS = "Progress";
+    private static final String STATE = "State";
+    private static final String FINAL_STATE = "Final-State";
+    private static final String TRACKING_URL = "Tracking-URL";
+    private static final String RPC_PORT = "RPC Port";
+    private static final String AM_HOST = "AM Host";
+    private static final String DIAGNOSTICS = "Diagnostics";
+
+    private ApplicationReport report;
+
+    public YarnApplicationReport(String output) throws LoadException {
+        this.report = new ApplicationReportPBImpl();
+        parseFromOutput(output);
+    }
+
+    public ApplicationReport getReport() {
+        return report;
+    }
+
+    private void parseFromOutput(String output) throws LoadException {
+        Map<String, String> reportMap = Maps.newHashMap();
+        List<String> lines = Splitter.onPattern("\n").trimResults().splitToList(output);
+        // Application-Id : application_1573630236805_6763648 ==> (Application-Id, application_1573630236805_6763648)
+        for (String line : lines) {
+            List<String> entry = Splitter.onPattern(":").limit(2).trimResults().splitToList(line);
+            Preconditions.checkState(entry.size() <= 2);

Review comment:
       Preconditions.checkState(entry.size() <= 2, line);




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on a change in pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on a change in pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#discussion_r477182204



##########
File path: fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ConfigFile.java
##########
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.common.LoadException;
+
+public interface ConfigFile {

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] xy720 commented on pull request #4383: [SparkLoad]Use the yarn command to get status and kill the application

Posted by GitBox <gi...@apache.org>.
xy720 commented on pull request #4383:
URL: https://github.com/apache/incubator-doris/pull/4383#issuecomment-680693360


   I will add some relevant user doc later


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org