You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/03 02:09:01 UTC

[1/3] storm git commit: STORM-2215: validate blobs are present before submitting

Repository: storm
Updated Branches:
  refs/heads/1.x-branch f43dabfa8 -> b0a44807e


STORM-2215: validate blobs are present before submitting


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96b6f60c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96b6f60c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96b6f60c

Branch: refs/heads/1.x-branch
Commit: 96b6f60c2f30334a8e78dba9d5e789e67e7d69a9
Parents: f43dabf
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Nov 22 13:50:41 2016 -0600
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Dec 3 11:08:04 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  9 ++++++---
 .../jvm/org/apache/storm/StormSubmitter.java    | 16 +++++++++++++--
 .../apache/storm/blobstore/NimbusBlobStore.java |  7 ++++++-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 21 ++++++++++++++++++++
 .../org/apache/storm/TestConfigValidate.java    | 17 ++++++++++++++++
 5 files changed, 64 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 2732d81..46ca121 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -27,7 +27,7 @@
            [java.util Collections List HashMap]
            [org.apache.storm.generated NimbusSummary])
   (:import [java.nio ByteBuffer]
-           [java.util Collections List HashMap ArrayList Iterator])
+           [java.util Collections List HashMap ArrayList Iterator HashSet])
   (:import [org.apache.storm.blobstore AtomicOutputStream BlobStoreAclHandler
                                        InputStreamWithMeta KeyFilter KeySequenceNumber BlobSynchronizer BlobStoreUtils])
   (:import [java.io File FileOutputStream FileInputStream])
@@ -62,7 +62,8 @@
   (:import [org.apache.storm.utils VersionInfo Time]
            (org.apache.storm.metric ClusterMetricsConsumerExecutor)
            (org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo)
-           (org.apache.storm Config))
+           (org.apache.storm Config)
+           (com.google.common.collect Sets))
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
   (:require [metrics.meters :refer [defmeter mark!]])
@@ -1639,7 +1640,9 @@
           (.validate ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus)
                      storm-name
                      topo-conf
-                     topology))
+                     topology)
+          (Utils/validateTopologyBlobStoreMap topo-conf (Sets/newHashSet ^Iterator (.listKeys blob-store)))
+          )
         (swap! (:submitted-count nimbus) inc)
         (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
               credentials (.get_creds submitOptions)

http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
index 53771fd..effeed4 100644
--- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.storm;
 
+import com.google.common.collect.Sets;
+
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -25,7 +27,9 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.storm.blobstore.NimbusBlobStore;
 import org.apache.storm.dependency.DependencyPropertiesParser;
 import org.apache.storm.dependency.DependencyUploader;
 import org.apache.storm.hooks.SubmitterHookException;
@@ -550,12 +554,13 @@ public class StormSubmitter {
         public void onCompleted(String srcFile, String targetFile, long totalBytes);
     }
 
-    private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
+    private static void validateConfs(Map<String, Object> stormConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException {
         ConfigValidation.validateFields(stormConf);
         validateTopologyWorkerMaxHeapSizeMBConfigs(stormConf, topology);
+        Utils.validateTopologyBlobStoreMap(stormConf, getListOfKeysFromBlobStore(stormConf));
     }
 
-    private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map stormConf, StormTopology topology) {
+    private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map<String, Object> stormConf, StormTopology topology) {
         double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
         Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
         if(topologyWorkerMaxHeapSize < largestMemReq) {
@@ -583,4 +588,11 @@ public class StormSubmitter {
         }
         return largestMemoryOperator;
     }
+
+    private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> stormConf) {
+        try (NimbusBlobStore client = new NimbusBlobStore()) {
+            client.prepare(stormConf);
+            return Sets.newHashSet(client.listKeys());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
index 009a07c..14046ef 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/NimbusBlobStore.java
@@ -45,7 +45,7 @@ import java.util.NoSuchElementException;
  * For local blob store it is also the client facing API for
  * supervisor in order to download blobs from nimbus.
  */
-public class NimbusBlobStore extends ClientBlobStore {
+public class NimbusBlobStore extends ClientBlobStore implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(NimbusBlobStore.class);
 
     public class NimbusKeyIterator implements Iterator<String> {
@@ -420,4 +420,9 @@ public class NimbusBlobStore extends ClientBlobStore {
             client = null;
         }
     }
+
+    @Override
+    public void close() {
+        shutdown();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index c01934e..ed5c552 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -132,6 +132,8 @@ import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
+import org.apache.storm.generated.InvalidTopologyException;
+
 import clojure.lang.RT;
 
 public class Utils {
@@ -1337,6 +1339,25 @@ public class Utils {
         }
     }
 
+    public static void validateTopologyBlobStoreMap(Map<String, ?> stormConf, Set<String> blobStoreKeys) throws InvalidTopologyException {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> blobStoreMap = (Map<String, Object>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        if (blobStoreMap != null) {
+            Set<String> mapKeys = blobStoreMap.keySet();
+            Set<String> missingKeys = new HashSet<>();
+
+            for (String key : mapKeys) {
+                if (!blobStoreKeys.contains(key)) {
+                    missingKeys.add(key);
+                }
+            }
+            if (!missingKeys.isEmpty()) {
+                throw new InvalidTopologyException("The topology blob store map does not " +
+                        "contain the valid keys to launch the topology " + missingKeys);
+            }
+        }
+    }
+    
     /**
      * Given a File input it will unzip the file in a the unzip directory
      * passed as the second parameter

http://git-wip-us.apache.org/repos/asf/storm/blob/96b6f60c/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
index 5fe8033..0e08153 100644
--- a/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-core/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm;
 
+import org.apache.storm.generated.InvalidTopologyException;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.validation.ConfigValidation.*;
@@ -31,6 +32,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -80,6 +82,21 @@ public class TestConfigValidate {
         ConfigValidation.validateFields(conf);
     }
 
+    @Test(expected = InvalidTopologyException.class)
+    public void testValidateTopologyBlobStoreMap() throws InvalidTopologyException {
+        Map<String,Map> stormConf = new HashMap<>();
+        Map<String,Map> topologyMap = new HashMap<>();
+        topologyMap.put("key1", new HashMap<String,String>());
+        topologyMap.put("key2", new HashMap<String,String>());
+        stormConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap);
+        HashSet<String> keySet = new HashSet<>();
+        keySet.add("key1");
+        keySet.add("key2");
+        Utils.validateTopologyBlobStoreMap(stormConf, keySet);
+        keySet.remove("key2");
+        Utils.validateTopologyBlobStoreMap(stormConf, keySet);
+    }
+
     @Test
     public void defaultYamlTest() throws InvocationTargetException, NoSuchMethodException, NoSuchFieldException, InstantiationException, IllegalAccessException {
         Map conf = Utils.readStormConfig();


[3/3] storm git commit: STORM-2215: CHANGELOG

Posted by ka...@apache.org.
STORM-2215: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b0a44807
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b0a44807
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b0a44807

Branch: refs/heads/1.x-branch
Commit: b0a44807e65677866532508ac366cbb875f61683
Parents: aa375f5
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Dec 3 11:08:47 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Dec 3 11:08:47 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b0a44807/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 27adeb2..6243d6f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2215: validate blobs are present before submitting
  * STORM-2170: [Storm SQL] Add built-in socket datasource to runtime
  * STORM-2226: Fix kafka spout offset lag ui for kerberized kafka
  * STORM-2224: Exposed a method to override in computing the field from given tuple in FieldSelector


[2/3] storm git commit: Merge branch 'STORM-2215-1.x-merge' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2215-1.x-merge' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aa375f50
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aa375f50
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aa375f50

Branch: refs/heads/1.x-branch
Commit: aa375f50c258ac45cc7ddfabc9a5310e21daf1f6
Parents: f43dabf 96b6f60
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Dec 3 11:08:12 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sat Dec 3 11:08:12 2016 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  9 ++++++---
 .../jvm/org/apache/storm/StormSubmitter.java    | 16 +++++++++++++--
 .../apache/storm/blobstore/NimbusBlobStore.java |  7 ++++++-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 21 ++++++++++++++++++++
 .../org/apache/storm/TestConfigValidate.java    | 17 ++++++++++++++++
 5 files changed, 64 insertions(+), 6 deletions(-)
----------------------------------------------------------------------