You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/03 02:09:01 UTC
[1/3] storm git commit: STORM-2215: validate blobs are present before
submitting
Repository: storm
Updated Branches:
refs/heads/1.x-branch f43dabfa8 -> b0a44807e
STORM-2215: validate blobs are present before submitting
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96b6f60c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96b6f60c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96b6f60c
Branch: refs/heads/1.x-branch
Commit: 96b6f60c2f30334a8e78dba9d5e789e67e7d69a9
Parents: f43dabf
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Nov 22 13:50:41 2016 -0600
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Dec 3 11:08:04 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 9 ++++++---
.../jvm/org/apache/storm/StormSubmitter.java | 16 +++++++++++++--
.../apache/storm/blobstore/NimbusBlobStore.java | 7 ++++++-
.../src/jvm/org/apache/storm/utils/Utils.java | 21 ++++++++++++++++++++
.../org/apache/storm/TestConfigValidate.java | 17 ++++++++++++++++
5 files changed, 64 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 2732d81..46ca121 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -27,7 +27,7 @@
[java.util Collections List HashMap]
[org.apache.storm.generated NimbusSummary])
(:import [java.nio ByteBuffer]
- [java.util Collections List HashMap ArrayList Iterator])
+ [java.util Collections List HashMap ArrayList Iterator HashSet])
(:import [org.apache.storm.blobstore AtomicOutputStream BlobStoreAclHandler
InputStreamWithMeta KeyFilter KeySequenceNumber BlobSynchronizer BlobStoreUtils])
(:import [java.io File FileOutputStream FileInputStream])
@@ -62,7 +62,8 @@
(:import [org.apache.storm.utils VersionInfo Time]
(org.apache.storm.metric ClusterMetricsConsumerExecutor)
(org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo)
- (org.apache.storm Config))
+ (org.apache.storm Config)
+ (com.google.common.collect Sets))
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:require [metrics.meters :refer [defmeter mark!]])
@@ -1639,7 +1640,9 @@
(.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus)
storm-name
topo-conf
- topology))
+ topology)
+ (Utils/validateTopologyBlobStoreMap topo-conf (Sets/newHashSet ^Iterator (.listKeys blob-store)))
+ )
(swap! (:submitted-count nimbus) inc)
(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
credentials (.get_creds submitOptions)
http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index 53771fd..effeed4 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm;
+import com.google.common.collect.Sets;
+
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -25,7 +27,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import org.apache.storm.blobstore.NimbusBlobStore;
import org.apache.storm.dependency.DependencyPropertiesParser;
import org.apache.storm.dependency.DependencyUploader;
import org.apache.storm.hooks.SubmitterHookException;
@@ -550,12 +554,13 @@ public class StormSubmitter {
public void onCompleted(String srcFile, String targetFile, long totalBytes);
}
- private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
+ private static void validateConfs(Map<String, Object> stormConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException {
ConfigValidation.validateFields(stormConf);
validateTopologyWorkerMaxHeapSizeMBConfigs(stormConf, topology);
+ Utils.validateTopologyBlobStoreMap(stormConf, getListOfKeysFromBlobStore(stormConf));
}
- private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map stormConf, StormTopology topology) {
+ private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map<String, Object> stormConf, StormTopology topology) {
double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
if(topologyWorkerMaxHeapSize < largestMemReq) {
@@ -583,4 +588,11 @@ public class StormSubmitter {
}
return largestMemoryOperator;
}
+
+ private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> stormConf) {
+ try (NimbusBlobStore client = new NimbusBlobStore()) {
+ client.prepare(stormConf);
+ return Sets.newHashSet(client.listKeys());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
index 009a07c..14046ef 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -45,7 +45,7 @@ import java.util.NoSuchElementException;
* For local blob store it is also the client facing API for
* supervisor in order to download blobs from nimbus.
*/
-public class NimbusBlobStore extends ClientBlobStore {
+public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class);
public class NimbusKeyIterator implements Iterator<String> {
@@ -420,4 +420,9 @@ public class NimbusBlobStore extends ClientBlobStore {
client = null;
}
}
+
+ @Override
+ public void close() {
+ shutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index c01934e..ed5c552 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -132,6 +132,8 @@ import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
+import org.apache.storm.generated.InvalidTopologyException;
+
import clojure.lang.RT;
public class Utils {
@@ -1337,6 +1339,25 @@ public class Utils {
}
}
+ public static void validateTopologyBlobStoreMap(Map<String, ?> stormConf, Set<String> blobStoreKeys) throws InvalidTopologyException {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> blobStoreMap = (Map<String, Object>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ if (blobStoreMap != null) {
+ Set<String> mapKeys = blobStoreMap.keySet();
+ Set<String> missingKeys = new HashSet<>();
+
+ for (String key : mapKeys) {
+ if (!blobStoreKeys.contains(key)) {
+ missingKeys.add(key);
+ }
+ }
+ if (!missingKeys.isEmpty()) {
+ throw new InvalidTopologyException("The topology blob store map does not " +
+ "contain the valid keys to launch the topology " + missingKeys);
+ }
+ }
+ }
+
/**
* Given a File input it will unzip the file in a the unzip directory
* passed as the second parameter
http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
index 5fe8033..0e08153 100644
--- a/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -18,6 +18,7 @@
package org.apache.storm;
+import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.validation.ConfigValidation.*;
@@ -31,6 +32,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -80,6 +82,21 @@ public class TestConfigValidate {
ConfigValidation.validateFields(conf);
}
+ @Test(expected = InvalidTopologyException.class)
+ public void testValidateTopologyBlobStoreMap() throws InvalidTopologyException {
+ Map<String,Map> stormConf = new HashMap<>();
+ Map<String,Map> topologyMap = new HashMap<>();
+ topologyMap.put("key1", new HashMap<String,String>());
+ topologyMap.put("key2", new HashMap<String,String>());
+ stormConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap);
+ HashSet<String> keySet = new HashSet<>();
+ keySet.add("key1");
+ keySet.add("key2");
+ Utils.validateTopologyBlobStoreMap(stormConf, keySet);
+ keySet.remove("key2");
+ Utils.validateTopologyBlobStoreMap(stormConf, keySet);
+ }
+
@Test
public void defaultYamlTest() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
Map conf = Utils.readStormConfig();
[3/3] storm git commit: STORM-2215: CHANGELOG
Posted by ka...@apache.org.
STORM-2215: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0a44807
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0a44807
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0a44807
Branch: refs/heads/1.x-branch
Commit: b0a44807e65677866532508ac366cbb875f61683
Parents: aa375f5
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Dec 3 11:08:47 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Dec 3 11:08:47 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b0a44807/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 27adeb2..6243d6f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2215: validate blobs are present before submitting
* STORM-2170: [Storm SQL] Add built-in socket datasource to runtime
* STORM-2226: Fix kafka spout offset lag ui for kerberized kafka
* STORM-2224: Exposed a method to override in computing the field from given tuple in FieldSelector
[2/3] storm git commit: Merge branch 'STORM-2215-1.x-merge' into
1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2215-1.x-merge' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aa375f50
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aa375f50
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aa375f50
Branch: refs/heads/1.x-branch
Commit: aa375f50c258ac45cc7ddfabc9a5310e21daf1f6
Parents: f43dabf 96b6f60
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Dec 3 11:08:12 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Dec 3 11:08:12 2016 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 9 ++++++---
.../jvm/org/apache/storm/StormSubmitter.java | 16 +++++++++++++--
.../apache/storm/blobstore/NimbusBlobStore.java | 7 ++++++-
.../src/jvm/org/apache/storm/utils/Utils.java | 21 ++++++++++++++++++++
.../org/apache/storm/TestConfigValidate.java | 17 ++++++++++++++++
5 files changed, 64 insertions(+), 6 deletions(-)
----------------------------------------------------------------------