You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/08/31 17:20:29 UTC

[storm] branch 2.1.x-branch updated: [STORM-3501] local cluster worker restarts because of missing resources folder (#3120)

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new 71492f5  [STORM-3501] local cluster worker restarts because of missing resources folder (#3120)
71492f5 is described below

commit 71492f51789e6b7b0c842ae6a6bac48dc03f645a
Author: Diogo Monteiro <di...@gmail.com>
AuthorDate: Sat Aug 31 18:19:14 2019 +0100

    [STORM-3501] local cluster worker restarts because of missing resources folder (#3120)
    
    * [STORM-3501] local cluster worker restarts because of missing resources folder
---
 .../storm/daemon/supervisor/AdvancedFSOps.java     | 12 +++
 .../storm/daemon/supervisor/IAdvancedFSOps.java    | 10 +++
 .../storm/localizer/LocallyCachedTopologyBlob.java |  9 +-
 .../java/org/apache/storm/utils/ServerUtils.java   | 14 +++
 .../apache/storm/localizer/AsyncLocalizerTest.java | 99 ++++++++++++++++++++++
 5 files changed, 141 insertions(+), 3 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 814ec12..a9bafd3 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -97,6 +97,18 @@ public class AdvancedFSOps implements IAdvancedFSOps {
     }
 
     /**
+     * Moves a file to a given destination.
+     *
+     * @param fromFile file to move
+     * @param toFile where to move it
+     * @throws IOException on any error
+     */
+    @Override
+    public void moveFile(File fromFile, File toFile) throws IOException {
+        Files.move(fromFile.toPath(), toFile.toPath());
+    }
+
+    /**
      * Check whether supports atomic directory move.
      * @return true if an atomic directory move works, else false
      */
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
index f55ba7f..ccb4a1b 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
@@ -46,6 +46,16 @@ public interface IAdvancedFSOps {
      */
     void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException;
 
+
+    /**
+     * Moves a file to a given destination.
+     *
+     * @param fromFile file to move
+     * @param toFile where to move it
+     * @throws IOException on any error
+     */
+    void moveFile(File fromFile, File toFile) throws IOException;
+
     /**
      * Check whether supports atomic directory move.
      * @return true if an atomic directory move works, else false
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index 3b019f4..c57ebed 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -139,9 +139,8 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
         if (isLocalMode && type == TopologyBlobType.TOPO_JAR) {
             LOG.debug("DOWNLOADING LOCAL JAR to TEMP LOCATION... {}", topologyId);
             //This is a special case where the jar was not uploaded so we will not download it (it is already on the classpath)
-            ClassLoader classloader = Thread.currentThread().getContextClassLoader();
             String resourcesJar = resourcesJar();
-            URL url = classloader.getResource(ServerConfigUtils.RESOURCES_SUBDIR);
+            URL url = ServerUtils.getResourceFromClassloader(ServerConfigUtils.RESOURCES_SUBDIR);
             Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(LOCAL_MODE_JAR_VERSION));
             if (resourcesJar != null) {
                 LOG.info("Extracting resources from jar at {} to {}", resourcesJar, extractionDest);
@@ -154,6 +153,10 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
                 } else {
                     fsOps.copyDirectory(new File(url.getFile()), extractionDest.toFile());
                 }
+            } else if (!fsOps.fileExists(extractionDest)) {
+                // if we can't find the resources directory in a resources jar or in the classpath just create an empty
+                // resources directory. This way we can check later that the topology jar was fully downloaded.
+                fsOps.forceMkdir(extractionDest);
             }
             return LOCAL_MODE_JAR_VERSION;
         }
@@ -225,7 +228,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
         }
         if (!(isLocalMode && type == TopologyBlobType.TOPO_JAR)) {
             //Don't try to move the JAR file in local mode, it does not exist because it was not uploaded
-            Files.move(tempLoc, dest);
+            fsOps.moveFile(tempLoc.toFile(), dest.toFile());
         }
         synchronized (LocallyCachedTopologyBlob.class) {
             //This is a bit ugly, but it works.  In order to maintain the same directory structure that existed before
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 85ade01..5c2474c 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.RandomAccessFile;
+import java.net.URL;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -236,6 +237,14 @@ public class ServerUtils {
         return _instance.currentClasspathImpl();
     }
 
+
+    /**
+     *  Returns the current thread classloader.
+     */
+    public static URL getResourceFromClassloader(String name) {
+        return _instance.getResourceFromClassloaderImpl(name);
+    }
+
     /**
      * Determines if a zip archive contains a particular directory.
      *
@@ -753,6 +762,11 @@ public class ServerUtils {
         return System.getProperty("java.class.path");
     }
 
+
+    public URL getResourceFromClassloaderImpl(String name) {
+        return Thread.currentThread().getContextClassLoader().getResource(name);
+    }
+
     public void downloadResourcesAsSupervisorImpl(String key, String localFile,
                                                   ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
         final int MAX_RETRY_ATTEMPTS = 2;
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 8228702..550c808 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -18,6 +18,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.storm.blobstore.BlobStoreAclHandler.WORLD_EVERYTHING;
 import static org.apache.storm.localizer.LocalizedResource.USERCACHE;
+import static org.apache.storm.localizer.LocallyCachedTopologyBlob.LOCAL_MODE_JAR_VERSION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -249,6 +251,103 @@ public class AsyncLocalizerTest {
         }
     }
 
+
+    @Test
+    public void testRequestDownloadTopologyBlobsLocalMode() throws Exception {
+        // tests download of topology blobs in local mode on a topology without resources folder
+        final String topoId = "TOPO-12345";
+        final String user = "user";
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        la.set_owner(user);
+        ExecutorInfo ei = new ExecutorInfo();
+        ei.set_task_start(1);
+        ei.set_task_end(1);
+        la.add_to_executors(ei);
+        final String topoName = "TOPO";
+        final int port = 8080;
+        final String simpleLocalName = "simple.txt";
+        final String simpleKey = "simple";
+
+        final String stormLocal = "/tmp/storm-local/";
+        final File userDir = new File(stormLocal, user);
+        final String stormRoot = stormLocal + topoId + "/";
+
+        final String localizerRoot = getTestLocalizerRoot();
+
+        final StormTopology st = new StormTopology();
+        st.set_spouts(new HashMap<>());
+        st.set_bolts(new HashMap<>());
+        st.set_state_spouts(new HashMap<>());
+
+        Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
+        Map<String, Object> simple = new HashMap<>();
+        simple.put("localname", simpleLocalName);
+        simple.put("uncompress", false);
+        topoBlobMap.put(simpleKey, simple);
+
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        conf.put(Config.STORM_CLUSTER_MODE, "local");
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        ConfigUtils mockedCU = mock(ConfigUtils.class);
+        ServerUtils mockedSU = mock(ServerUtils.class);
+
+        Map<String, Object> topoConf = new HashMap<>(conf);
+        topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
+        topoConf.put(Config.TOPOLOGY_NAME, topoName);
+
+        List<LocalizedResource> localizedList = new ArrayList<>();
+        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+        LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user, metricsRegistry);
+        localizedList.add(simpleLocal);
+
+        AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot, metricsRegistry));
+        ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
+        ServerUtils origSU = ServerUtils.setInstance(mockedSU);
+
+        try {
+            when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
+            when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
+            when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
+
+            doReturn(mockblobstore).when(bl).getClientBlobStore();
+            doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
+            doReturn(localizedList).when(bl).getBlobs(any(List.class), any(), any());
+            doReturn(mock(OutputStream.class)).when(ops).getOutputStream(any());
+
+            ReadableBlobMeta blobMeta = new ReadableBlobMeta();
+            blobMeta.set_version(1);
+            doReturn(blobMeta).when(mockblobstore).getBlobMeta(any());
+            when(mockblobstore.getBlob(any())).thenAnswer(invocation -> new TestInputStreamWithMeta(LOCAL_MODE_JAR_VERSION));
+
+            Future<Void> f = bl.requestDownloadTopologyBlobs(la, port, null);
+            f.get(20, TimeUnit.SECONDS);
+
+            verify(bl).getLocalUserFileCacheDir(user);
+
+            verify(ops).fileExists(userDir);
+            verify(ops).forceMkdir(userDir);
+
+            verify(bl).getBlobs(any(List.class), any(), any());
+
+            Path extractionDir = Paths.get(stormRoot,
+                    LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR.getTempExtractionDir(LOCAL_MODE_JAR_VERSION));
+
+            // make sure resources dir is created.
+            verify(ops).forceMkdir(extractionDir);
+
+        } finally {
+            try {
+                ConfigUtils.setInstance(orig);
+                ServerUtils.setInstance(origSU);
+                bl.close();
+            } catch (Throwable e) {
+                LOG.error("ERROR trying to close an object", e);
+            }
+        }
+    }
+
     @Before
     public void setUp() throws Exception {
         baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-" + UUID.randomUUID());