You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/25 00:34:20 UTC
[2/3] storm git commit: STORM-3110: Reintroduce user check and rely
on 'user.name' when not running worker as launching user
STORM-3110: Reintroduce user check and rely on 'user.name' when not running worker as launching user
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e63fe124
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e63fe124
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e63fe124
Branch: refs/heads/1.x-branch
Commit: e63fe1246049380e694a7d1157930c751b33a21b
Parents: 1d43937
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Jun 21 13:46:35 2018 -0700
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Thu Jun 21 14:20:32 2018 -0700
----------------------------------------------------------------------
.../storm/daemon/supervisor/Container.java | 72 +++++++++-----------
.../daemon/supervisor/RunAsUserContainer.java | 12 ++++
.../storm/daemon/supervisor/ContainerTest.java | 54 ---------------
3 files changed, 44 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e63fe124/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
index fd619ac..10a9b6f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -20,7 +20,6 @@ package org.apache.storm.daemon.supervisor;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Writer;
import java.lang.ProcessBuilder.Redirect;
@@ -33,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.storm.Config;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LocalAssignment;
@@ -217,20 +215,18 @@ public abstract class Container implements Killable {
* @throws IOException on any error
*/
protected boolean isProcessAlive(long pid, String user) throws IOException {
- if (isOnWindows()) {
+ if (Utils.IS_ON_WINDOWS) {
return isWindowsProcessAlive(pid, user);
}
return isPosixProcessAlive(pid, user);
}
-
- @VisibleForTesting
- boolean isOnWindows() {
- return Utils.IS_ON_WINDOWS;
- }
-
+
private boolean isWindowsProcessAlive(long pid, String user) throws IOException {
boolean ret = false;
- try (BufferedReader in = new BufferedReader(new InputStreamReader(getWindowsProcessInputStream(pid)))) {
+ ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
+ pb.redirectError(Redirect.INHERIT);
+ Process p = pb.start();
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
String read;
while ((read = in.readLine()) != null) {
if (read.contains("User Name:")) { //Check for : in case someone called their user "User Name"
@@ -240,11 +236,10 @@ public abstract class Container implements Killable {
if(userNameLineSplitOnWhitespace.size() == 2){
List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
- if (!user.equals(processUser)){
- LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
- }
- if(!processUser.isEmpty()){
+ if(user.equals(processUser)){
ret = true;
+ } else {
+ LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
}
} else {
LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", read);
@@ -255,50 +250,36 @@ public abstract class Container implements Killable {
}
return ret;
}
-
- @VisibleForTesting
- InputStream getWindowsProcessInputStream(long pid) throws IOException {
- ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
- pb.redirectError(Redirect.INHERIT);
- Process p = pb.start();
- return p.getInputStream();
- }
-
-
+
private boolean isPosixProcessAlive(long pid, String user) throws IOException {
boolean ret = false;
- try (BufferedReader in = new BufferedReader(new InputStreamReader(getPosixProcessInputStream(pid)))) {
+ ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
+ pb.redirectError(Redirect.INHERIT);
+ Process p = pb.start();
+ try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
String first = in.readLine();
assert("USER".equals(first));
String processUser;
while ((processUser = in.readLine()) != null) {
- if (!user.equals(processUser)) {
- LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
- }
- if (!processUser.isEmpty()) {
+ if (user.equals(processUser)) {
ret = true;
break;
+ } else {
+ LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
}
}
}
return ret;
}
-
- @VisibleForTesting
- InputStream getPosixProcessInputStream(long pid) throws IOException {
- ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
- pb.redirectError(Redirect.INHERIT);
- Process p = pb.start();
- return p.getInputStream();
- }
-
+
@Override
public boolean areAllProcessesDead() throws IOException {
Set<Long> pids = getAllPids();
- String user = getWorkerUser();
+ String user = getRunWorkerAsUser();
boolean allDead = true;
for (Long pid: pids) {
+ LOG.debug("Checking if pid {} owner {} is alive", pid, user);
if (!isProcessAlive(pid, user)) {
LOG.debug("{}: PID {} is dead", _workerId, pid);
} else {
@@ -495,7 +476,18 @@ public abstract class Container implements Killable {
throw new IllegalStateException("Could not recover the user for " + _workerId);
}
}
-
+
+ /**
+ * Returns the user that the worker process is running as.
+ *
+ * The default behavior is to launch the worker as the user supervisor is running as (e.g. 'storm')
+ *
+ * @return the user that the worker process is running as.
+ */
+ protected String getRunWorkerAsUser() {
+ return System.getProperty("user.name");
+ }
+
protected void saveWorkerUser(String user) throws IOException {
_type.assertFull();
LOG.info("SET worker-user {} {}", _workerId, user);
http://git-wip-us.apache.org/repos/asf/storm/blob/e63fe124/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
index 29b8576..687c823 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
@@ -93,4 +93,16 @@ public class RunAsUserContainer extends BasicContainer {
List<String> commandPrefix = null;
SupervisorUtils.processLauncher(_conf, user, commandPrefix, args, null, logPrefix, processExitCallback, targetDir);
}
+
+ /**
+ * If 'supervisor.run.worker.as.user' is set, worker will be launched as the user that launched the topology.
+ */
+ @Override
+ protected String getRunWorkerAsUser() {
+ try {
+ return getWorkerUser();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e63fe124/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
index 4c9437a..07427bd 100644
--- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -20,10 +20,8 @@ package org.apache.storm.daemon.supervisor;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,7 +36,6 @@ import org.apache.storm.Config;
import org.apache.storm.daemon.supervisor.Container.ContainerType;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileRequest;
-import org.junit.Assert;
import org.junit.Test;
import org.yaml.snakeyaml.Yaml;
@@ -265,55 +262,4 @@ public class ContainerTest {
verify(ops).deleteIfExists(eq(workerRoot), eq(user), any(String.class));
verify(ops).deleteIfExists(workerUserFile);
}
-
- @Test
- public void testAreAllProcessesDeadPosix() throws Exception {
- final String topoId = "test_topology";
- final Map<String, Object> superConf = new HashMap<>();
- AdvancedFSOps ops = mock(AdvancedFSOps.class);
- when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-
- LocalAssignment la = new LocalAssignment();
- la.set_topology_id(topoId);
- MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", 8080, la, "worker", new HashMap<String, Object>(), ops);
-
- MockContainer spy = spy(mc);
- doReturn(false).when(spy).isOnWindows();
- when(spy.getAllPids()).thenReturn(Collections.singleton(0L));
- InputStream psout = new ByteArrayInputStream("USER\nmockuser".getBytes());
- when(spy.getPosixProcessInputStream(0)).thenReturn(psout);
-
- Assert.assertFalse(spy.areAllProcessesDead());
-
- psout = new ByteArrayInputStream("USER\n".getBytes());
- when(spy.getPosixProcessInputStream(0)).thenReturn(psout);
-
- Assert.assertTrue(spy.areAllProcessesDead());
- }
-
- @Test
- public void testAreAllProcessesDeadWindows() throws Exception {
- final String topoId = "test_topology";
- final Map<String, Object> superConf = new HashMap<>();
- AdvancedFSOps ops = mock(AdvancedFSOps.class);
- when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-
- LocalAssignment la = new LocalAssignment();
- la.set_topology_id(topoId);
- MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
- "SUPERVISOR", 8080, la, "worker", new HashMap<String, Object>(), ops);
- MockContainer spy = spy(mc);
- doReturn(true).when(spy).isOnWindows();
- when(spy.getAllPids()).thenReturn(Collections.singleton(0L));
- InputStream psout = new ByteArrayInputStream("User Name: exampleDomain\\exampleUser".getBytes());
- doReturn(psout).when(spy).getWindowsProcessInputStream(0L);
-
- Assert.assertFalse(spy.areAllProcessesDead());
-
- psout = new ByteArrayInputStream("".getBytes());
- doReturn(psout).when(spy).getWindowsProcessInputStream(0L);
-
- Assert.assertTrue(spy.areAllProcessesDead());
- }
}