You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/25 00:34:20 UTC

[2/3] storm git commit: STORM-3110: Reintroduce user check and rely on 'user.name' when not running worker as launching user

STORM-3110: Reintroduce user check and rely on 'user.name' when not running worker as launching user


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e63fe124
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e63fe124
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e63fe124

Branch: refs/heads/1.x-branch
Commit: e63fe1246049380e694a7d1157930c751b33a21b
Parents: 1d43937
Author: Arun Mahadevan <ar...@apache.org>
Authored: Thu Jun 21 13:46:35 2018 -0700
Committer: Arun Mahadevan <ar...@apache.org>
Committed: Thu Jun 21 14:20:32 2018 -0700

----------------------------------------------------------------------
 .../storm/daemon/supervisor/Container.java      | 72 +++++++++-----------
 .../daemon/supervisor/RunAsUserContainer.java   | 12 ++++
 .../storm/daemon/supervisor/ContainerTest.java  | 54 ---------------
 3 files changed, 44 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e63fe124/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
index fd619ac..10a9b6f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java
@@ -20,7 +20,6 @@ package org.apache.storm.daemon.supervisor;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Writer;
 import java.lang.ProcessBuilder.Redirect;
@@ -33,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.Config;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
@@ -217,20 +215,18 @@ public abstract class Container implements Killable {
      * @throws IOException on any error
      */
     protected boolean isProcessAlive(long pid, String user) throws IOException {
-        if (isOnWindows()) {
+        if (Utils.IS_ON_WINDOWS) {
             return isWindowsProcessAlive(pid, user);
         }
         return isPosixProcessAlive(pid, user);
     }
-
-    @VisibleForTesting
-    boolean isOnWindows() {
-        return Utils.IS_ON_WINDOWS;
-    }
-
+    
     private boolean isWindowsProcessAlive(long pid, String user) throws IOException {
         boolean ret = false;
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(getWindowsProcessInputStream(pid)))) {
+        ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
+        pb.redirectError(Redirect.INHERIT);
+        Process p = pb.start();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
             String read;
             while ((read = in.readLine()) != null) {
                 if (read.contains("User Name:")) { //Check for : in case someone called their user "User Name"
@@ -240,11 +236,10 @@ public abstract class Container implements Killable {
                     if(userNameLineSplitOnWhitespace.size() == 2){
                         List<String> userAndMaybeDomain = Arrays.asList(userNameLineSplitOnWhitespace.get(1).trim().split("\\\\"));
                         String processUser = userAndMaybeDomain.size() == 2 ? userAndMaybeDomain.get(1) : userAndMaybeDomain.get(0);
-                        if (!user.equals(processUser)){
-                            LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
-                        }
-                        if(!processUser.isEmpty()){
+                        if(user.equals(processUser)){
                             ret = true;
+                        } else {
+                            LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
                         }
                     } else {
                         LOG.error("Received unexpected output from tasklist command. Expected one colon in user name line. Line was {}", read);
@@ -255,50 +250,36 @@ public abstract class Container implements Killable {
         }
         return ret;
     }
-
-    @VisibleForTesting
-    InputStream getWindowsProcessInputStream(long pid) throws IOException {
-        ProcessBuilder pb = new ProcessBuilder("tasklist", "/fo", "list", "/fi", "pid eq " + pid, "/v");
-        pb.redirectError(Redirect.INHERIT);
-        Process p = pb.start();
-        return p.getInputStream();
-    }
-
-
+    
     private boolean isPosixProcessAlive(long pid, String user) throws IOException {
         boolean ret = false;
-        try (BufferedReader in = new BufferedReader(new InputStreamReader(getPosixProcessInputStream(pid)))) {
+        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
+        pb.redirectError(Redirect.INHERIT);
+        Process p = pb.start();
+        try (BufferedReader in = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
             String first = in.readLine();
             assert("USER".equals(first));
             String processUser;
             while ((processUser = in.readLine()) != null) {
-                if (!user.equals(processUser)) {
-                    LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
-                }
-                if (!processUser.isEmpty()) {
+                if (user.equals(processUser)) {
                     ret = true;
                     break;
+                } else {
+                    LOG.info("Found {} running as {}, but expected it to be {}", pid, processUser, user);
                 }
             }
         }
         return ret;
     }
-
-    @VisibleForTesting
-    InputStream getPosixProcessInputStream(long pid) throws IOException {
-        ProcessBuilder pb = new ProcessBuilder("ps", "-o", "user", "-p", String.valueOf(pid));
-        pb.redirectError(Redirect.INHERIT);
-        Process p = pb.start();
-        return p.getInputStream();
-    }
-
+    
     @Override
     public boolean areAllProcessesDead() throws IOException {
         Set<Long> pids = getAllPids();
-        String user = getWorkerUser();
+        String user = getRunWorkerAsUser();
         
         boolean allDead = true;
         for (Long pid: pids) {
+            LOG.debug("Checking if pid {} owner {} is alive", pid, user);
             if (!isProcessAlive(pid, user)) {
                 LOG.debug("{}: PID {} is dead", _workerId, pid);
             } else {
@@ -495,7 +476,18 @@ public abstract class Container implements Killable {
             throw new IllegalStateException("Could not recover the user for " + _workerId);
         }
     }
-    
+
+    /**
+     * Returns the user that the worker process is running as.
+     *
+     * The default behavior is to launch the worker as the user supervisor is running as (e.g. 'storm')
+     *
+     * @return the user that the worker process is running as.
+     */
+    protected String getRunWorkerAsUser() {
+        return System.getProperty("user.name");
+    }
+
     protected void saveWorkerUser(String user) throws IOException {
         _type.assertFull();
         LOG.info("SET worker-user {} {}", _workerId, user);

http://git-wip-us.apache.org/repos/asf/storm/blob/e63fe124/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
index 29b8576..687c823 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java
@@ -93,4 +93,16 @@ public class RunAsUserContainer extends BasicContainer {
         List<String> commandPrefix = null;
         SupervisorUtils.processLauncher(_conf, user, commandPrefix, args, null, logPrefix, processExitCallback, targetDir);
     }
+
+    /**
+     * If 'supervisor.run.worker.as.user' is set, worker will be launched as the user that launched the topology.
+     */
+    @Override
+    protected String getRunWorkerAsUser() {
+        try {
+            return getWorkerUser();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e63fe124/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
index 4c9437a..07427bd 100644
--- a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -20,10 +20,8 @@ package org.apache.storm.daemon.supervisor;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,7 +36,6 @@ import org.apache.storm.Config;
 import org.apache.storm.daemon.supervisor.Container.ContainerType;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileRequest;
-import org.junit.Assert;
 import org.junit.Test;
 import org.yaml.snakeyaml.Yaml;
 
@@ -265,55 +262,4 @@ public class ContainerTest {
         verify(ops).deleteIfExists(eq(workerRoot), eq(user), any(String.class));
         verify(ops).deleteIfExists(workerUserFile);
     }
-
-    @Test
-    public void testAreAllProcessesDeadPosix() throws Exception {
-        final String topoId = "test_topology";
-        final Map<String, Object> superConf = new HashMap<>();
-        AdvancedFSOps ops = mock(AdvancedFSOps.class);
-        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-
-        LocalAssignment la = new LocalAssignment();
-        la.set_topology_id(topoId);
-        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
-                "SUPERVISOR", 8080, la, "worker", new HashMap<String, Object>(), ops);
-
-        MockContainer spy = spy(mc);
-        doReturn(false).when(spy).isOnWindows();
-        when(spy.getAllPids()).thenReturn(Collections.singleton(0L));
-        InputStream psout = new ByteArrayInputStream("USER\nmockuser".getBytes());
-        when(spy.getPosixProcessInputStream(0)).thenReturn(psout);
-
-        Assert.assertFalse(spy.areAllProcessesDead());
-
-        psout = new ByteArrayInputStream("USER\n".getBytes());
-        when(spy.getPosixProcessInputStream(0)).thenReturn(psout);
-
-        Assert.assertTrue(spy.areAllProcessesDead());
-    }
-
-    @Test
-    public void testAreAllProcessesDeadWindows() throws Exception {
-        final String topoId = "test_topology";
-        final Map<String, Object> superConf = new HashMap<>();
-        AdvancedFSOps ops = mock(AdvancedFSOps.class);
-        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-
-        LocalAssignment la = new LocalAssignment();
-        la.set_topology_id(topoId);
-        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
-                "SUPERVISOR", 8080, la, "worker", new HashMap<String, Object>(), ops);
-        MockContainer spy = spy(mc);
-        doReturn(true).when(spy).isOnWindows();
-        when(spy.getAllPids()).thenReturn(Collections.singleton(0L));
-        InputStream psout = new ByteArrayInputStream("User Name:    exampleDomain\\exampleUser".getBytes());
-        doReturn(psout).when(spy).getWindowsProcessInputStream(0L);
-
-        Assert.assertFalse(spy.areAllProcessesDead());
-
-        psout = new ByteArrayInputStream("".getBytes());
-        doReturn(psout).when(spy).getWindowsProcessInputStream(0L);
-
-        Assert.assertTrue(spy.areAllProcessesDead());
-    }
 }