You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/08/31 17:19:24 UTC
[storm] branch master updated: [STORM-3501] local cluster worker
restarts because of missing resources folder (#3120)
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 9c47a02 [STORM-3501] local cluster worker restarts because of missing resources folder (#3120)
9c47a02 is described below
commit 9c47a02e52173b55c0184f3d9ecfa4b7d2a79984
Author: Diogo Monteiro <di...@gmail.com>
AuthorDate: Sat Aug 31 18:19:14 2019 +0100
[STORM-3501] local cluster worker restarts because of missing resources folder (#3120)
* [STORM-3501] local cluster worker restarts because of missing resources folder
---
.../storm/daemon/supervisor/AdvancedFSOps.java | 12 +++
.../storm/daemon/supervisor/IAdvancedFSOps.java | 10 +++
.../storm/localizer/LocallyCachedTopologyBlob.java | 9 +-
.../java/org/apache/storm/utils/ServerUtils.java | 14 +++
.../apache/storm/localizer/AsyncLocalizerTest.java | 99 ++++++++++++++++++++++
5 files changed, 141 insertions(+), 3 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
index 23e91bf..d2a9ced 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/AdvancedFSOps.java
@@ -97,6 +97,18 @@ public class AdvancedFSOps implements IAdvancedFSOps {
}
/**
+ * Moves a file to a given destination.
+ *
+ * @param fromFile file to move
+ * @param toFile where to move it
+ * @throws IOException on any error
+ */
+ @Override
+ public void moveFile(File fromFile, File toFile) throws IOException {
+ Files.move(fromFile.toPath(), toFile.toPath());
+ }
+
+ /**
* Check whether supports atomic directory move.
* @return true if an atomic directory move works, else false
*/
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
index f55ba7f..ccb4a1b 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/IAdvancedFSOps.java
@@ -46,6 +46,16 @@ public interface IAdvancedFSOps {
*/
void moveDirectoryPreferAtomic(File fromDir, File toDir) throws IOException;
+
+ /**
+ * Moves a file to a given destination.
+ *
+ * @param fromFile file to move
+ * @param toFile where to move it
+ * @throws IOException on any error
+ */
+ void moveFile(File fromFile, File toFile) throws IOException;
+
/**
* Check whether supports atomic directory move.
* @return true if an atomic directory move works, else false
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
index 4ba41f9..babed56 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedTopologyBlob.java
@@ -139,9 +139,8 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
if (isLocalMode && type == TopologyBlobType.TOPO_JAR) {
LOG.debug("DOWNLOADING LOCAL JAR to TEMP LOCATION... {}", topologyId);
//This is a special case where the jar was not uploaded so we will not download it (it is already on the classpath)
- ClassLoader classloader = Thread.currentThread().getContextClassLoader();
String resourcesJar = resourcesJar();
- URL url = classloader.getResource(ServerConfigUtils.RESOURCES_SUBDIR);
+ URL url = ServerUtils.getResourceFromClassloader(ServerConfigUtils.RESOURCES_SUBDIR);
Path extractionDest = topologyBasicBlobsRootDir.resolve(type.getTempExtractionDir(LOCAL_MODE_JAR_VERSION));
if (resourcesJar != null) {
LOG.info("Extracting resources from jar at {} to {}", resourcesJar, extractionDest);
@@ -154,6 +153,10 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
} else {
fsOps.copyDirectory(new File(url.getFile()), extractionDest.toFile());
}
+ } else if (!fsOps.fileExists(extractionDest)) {
+ // if we can't find the resources directory in a resources jar or in the classpath just create an empty
+ // resources directory. This way we can check later that the topology jar was fully downloaded.
+ fsOps.forceMkdir(extractionDest);
}
return LOCAL_MODE_JAR_VERSION;
}
@@ -225,7 +228,7 @@ public class LocallyCachedTopologyBlob extends LocallyCachedBlob {
}
if (!(isLocalMode && type == TopologyBlobType.TOPO_JAR)) {
//Don't try to move the JAR file in local mode, it does not exist because it was not uploaded
- Files.move(tempLoc, dest);
+ fsOps.moveFile(tempLoc.toFile(), dest.toFile());
}
synchronized (LocallyCachedTopologyBlob.class) {
//This is a bit ugly, but it works. In order to maintain the same directory structure that existed before
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index eb72594..c026160 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
+import java.net.URL;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -234,6 +235,14 @@ public class ServerUtils {
return _instance.currentClasspathImpl();
}
+
+ /**
+ * Returns the current thread classloader.
+ */
+ public static URL getResourceFromClassloader(String name) {
+ return _instance.getResourceFromClassloaderImpl(name);
+ }
+
/**
* Determines if a zip archive contains a particular directory.
*
@@ -748,6 +757,11 @@ public class ServerUtils {
return System.getProperty("java.class.path");
}
+
+ public URL getResourceFromClassloaderImpl(String name) {
+ return Thread.currentThread().getContextClassLoader().getResource(name);
+ }
+
public void downloadResourcesAsSupervisorImpl(String key, String localFile,
ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException, IOException {
final int maxRetryAttempts = 2;
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 8228702..550c808 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -18,6 +18,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.storm.blobstore.BlobStoreAclHandler.WORLD_EVERYTHING;
import static org.apache.storm.localizer.LocalizedResource.USERCACHE;
+import static org.apache.storm.localizer.LocallyCachedTopologyBlob.LOCAL_MODE_JAR_VERSION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -249,6 +251,103 @@ public class AsyncLocalizerTest {
}
}
+
+ @Test
+ public void testRequestDownloadTopologyBlobsLocalMode() throws Exception {
+ // tests download of topology blobs in local mode on a topology without resources folder
+ final String topoId = "TOPO-12345";
+ final String user = "user";
+ LocalAssignment la = new LocalAssignment();
+ la.set_topology_id(topoId);
+ la.set_owner(user);
+ ExecutorInfo ei = new ExecutorInfo();
+ ei.set_task_start(1);
+ ei.set_task_end(1);
+ la.add_to_executors(ei);
+ final String topoName = "TOPO";
+ final int port = 8080;
+ final String simpleLocalName = "simple.txt";
+ final String simpleKey = "simple";
+
+ final String stormLocal = "/tmp/storm-local/";
+ final File userDir = new File(stormLocal, user);
+ final String stormRoot = stormLocal + topoId + "/";
+
+ final String localizerRoot = getTestLocalizerRoot();
+
+ final StormTopology st = new StormTopology();
+ st.set_spouts(new HashMap<>());
+ st.set_bolts(new HashMap<>());
+ st.set_state_spouts(new HashMap<>());
+
+ Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
+ Map<String, Object> simple = new HashMap<>();
+ simple.put("localname", simpleLocalName);
+ simple.put("uncompress", false);
+ topoBlobMap.put(simpleKey, simple);
+
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.STORM_LOCAL_DIR, stormLocal);
+ conf.put(Config.STORM_CLUSTER_MODE, "local");
+ AdvancedFSOps ops = mock(AdvancedFSOps.class);
+ ConfigUtils mockedCU = mock(ConfigUtils.class);
+ ServerUtils mockedSU = mock(ServerUtils.class);
+
+ Map<String, Object> topoConf = new HashMap<>(conf);
+ topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
+ topoConf.put(Config.TOPOLOGY_NAME, topoName);
+
+ List<LocalizedResource> localizedList = new ArrayList<>();
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
+ LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user, metricsRegistry);
+ localizedList.add(simpleLocal);
+
+ AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot, metricsRegistry));
+ ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
+ ServerUtils origSU = ServerUtils.setInstance(mockedSU);
+
+ try {
+ when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
+ when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
+ when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
+
+ doReturn(mockblobstore).when(bl).getClientBlobStore();
+ doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
+ doReturn(localizedList).when(bl).getBlobs(any(List.class), any(), any());
+ doReturn(mock(OutputStream.class)).when(ops).getOutputStream(any());
+
+ ReadableBlobMeta blobMeta = new ReadableBlobMeta();
+ blobMeta.set_version(1);
+ doReturn(blobMeta).when(mockblobstore).getBlobMeta(any());
+ when(mockblobstore.getBlob(any())).thenAnswer(invocation -> new TestInputStreamWithMeta(LOCAL_MODE_JAR_VERSION));
+
+ Future<Void> f = bl.requestDownloadTopologyBlobs(la, port, null);
+ f.get(20, TimeUnit.SECONDS);
+
+ verify(bl).getLocalUserFileCacheDir(user);
+
+ verify(ops).fileExists(userDir);
+ verify(ops).forceMkdir(userDir);
+
+ verify(bl).getBlobs(any(List.class), any(), any());
+
+ Path extractionDir = Paths.get(stormRoot,
+ LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR.getTempExtractionDir(LOCAL_MODE_JAR_VERSION));
+
+ // make sure resources dir is created.
+ verify(ops).forceMkdir(extractionDir);
+
+ } finally {
+ try {
+ ConfigUtils.setInstance(orig);
+ ServerUtils.setInstance(origSU);
+ bl.close();
+ } catch (Throwable e) {
+ LOG.error("ERROR trying to close an object", e);
+ }
+ }
+ }
+
@Before
public void setUp() throws Exception {
baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-" + UUID.randomUUID());