You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/12/17 22:09:20 UTC

[1/6] storm git commit: zk slowing down due to many connections

Repository: storm
Updated Branches:
  refs/heads/master f46fec327 -> cefc84309


zk slowing down due to many connections


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f27e5bce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f27e5bce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f27e5bce

Branch: refs/heads/master
Commit: f27e5bce2c7fe2e06cef7746227a01099d7d32d3
Parents: 39163bf
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 8 16:58:28 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 8 16:58:28 2015 -0600

----------------------------------------------------------------------
 .../storm/blobstore/LocalFsBlobStore.java       | 35 +++++++-------------
 .../test/clj/backtype/storm/nimbus_test.clj     |  5 +--
 .../backtype/storm/localizer/LocalizerTest.java |  4 +--
 3 files changed, 16 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f27e5bce/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index ac7a4bd..77845b0 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -74,11 +74,13 @@ public class LocalFsBlobStore extends BlobStore {
     private FileBlobStoreImpl fbs;
     private final int allPermissions = READ | WRITE | ADMIN;
     private Map conf;
+    private CuratorFramework zkClient;
 
     @Override
     public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
         this.conf = conf;
         this.nimbusInfo = nimbusInfo;
+        zkClient = BlobStoreUtils.createZKClient(conf);
         if (overrideBase == null) {
             overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
             if (overrideBase == null) {
@@ -254,27 +256,22 @@ public class LocalFsBlobStore extends BlobStore {
 
     @Override
     public void shutdown() {
+        if (zkClient != null) {
+            zkClient.close();
+        }
     }
 
     @Override
     public int getBlobReplication(String key, Subject who) throws Exception {
-        CuratorFramework zkClient = null;
         int replicationCount = 0;
-        try {
-            validateKey(key);
-            SettableBlobMeta meta = getStoredBlobMeta(key);
-            _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
-            zkClient = BlobStoreUtils.createZKClient(conf);
-            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
-                zkClient.close();
-                return 0;
-            }
-            replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
-        } finally {
-            if (zkClient != null) {
-                zkClient.close();
-            }
+        validateKey(key);
+        SettableBlobMeta meta = getStoredBlobMeta(key);
+        _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
+        if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
+            zkClient.close();
+            return 0;
         }
+        replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
         return replicationCount;
     }
 
@@ -287,11 +284,9 @@ public class LocalFsBlobStore extends BlobStore {
     //This additional check and download is for nimbus high availability in case you have more than one nimbus
     public boolean checkForBlobOrDownload(String key) {
         boolean checkBlobDownload = false;
-        CuratorFramework zkClient = null;
         try {
             List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
             if (!keyList.contains(key)) {
-                zkClient = BlobStoreUtils.createZKClient(conf);
                 if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) != null) {
                     Set<NimbusInfo> nimbusSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
                     if (BlobStoreUtils.downloadMissingBlob(conf, this, key, nimbusSet)) {
@@ -303,18 +298,12 @@ public class LocalFsBlobStore extends BlobStore {
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
-        } finally {
-            if (zkClient != null) {
-                zkClient.close();
-            }
         }
         return checkBlobDownload;
     }
 
     public void checkForBlobUpdate(String key) {
-        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
         BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo);
-        zkClient.close();
     }
 
     public void fullCleanup(long age) throws IOException {

http://git-wip-us.apache.org/repos/asf/storm/blob/f27e5bce/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 0847883..4989254 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -1238,10 +1238,11 @@
   (testing "nimbus-data uses correct ACLs"
     (let [scheme "digest"
           digest "storm:thisisapoorpassword"
-          auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
+          auth-conf (merge (read-storm-config)
+                    {STORM-ZOOKEEPER-AUTH-SCHEME scheme
                      STORM-ZOOKEEPER-AUTH-PAYLOAD digest
                      STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.DefaultPrincipalToLocal"
-                     NIMBUS-THRIFT-PORT 6666}
+                     NIMBUS-THRIFT-PORT 6666})
           expected-acls nimbus/NIMBUS-ZK-ACLS
           fake-inimbus (reify INimbus (getForcedScheduler [this] nil))]
       (stubbing [nimbus-topo-history-state nil

http://git-wip-us.apache.org/repos/asf/storm/blob/f27e5bce/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java b/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
index b47a3b4..dfd8294 100644
--- a/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
+++ b/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
@@ -517,11 +517,9 @@ public class LocalizerTest {
 
   @Test(expected = KeyNotFoundException.class)
   public void testKeyNotFoundException() throws Exception {
-    Map conf = new HashMap();
+    Map conf = Utils.readStormConfig();
     String key1 = "key1";
     conf.put(Config.STORM_LOCAL_DIR, "local");
-    conf.put(Config.BLOBSTORE_SUPERUSER, "superuser");
-    conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "backtype.storm.security.auth.DefaultPrincipalToLocal");
     LocalFsBlobStore bs = new LocalFsBlobStore();
     LocalFsBlobStore spy = spy(bs);
     Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);


[4/6] storm git commit: revert backpressure=false to true

Posted by kn...@apache.org.
revert backpressure=false to true


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd83da80
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd83da80
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd83da80

Branch: refs/heads/master
Commit: fd83da80460c70db1eed127d2a977ff7b7d9110c
Parents: a710bb6
Author: Sanket <sc...@untilservice-lm>
Authored: Mon Dec 14 23:27:26 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Mon Dec 14 23:27:26 2015 -0600

----------------------------------------------------------------------
 conf/defaults.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fd83da80/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8d9e3a3..b7c4677 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -179,7 +179,7 @@ task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
 
 # now should be null by default
-topology.backpressure.enable: false
+topology.backpressure.enable: true
 backpressure.disruptor.high.watermark: 0.9
 backpressure.disruptor.low.watermark: 0.4
 


[2/6] storm git commit: removing close method as shutdown does that

Posted by kn...@apache.org.
removing close method as shutdown does that


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c511a7e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c511a7e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c511a7e5

Branch: refs/heads/master
Commit: c511a7e5581c2aff5c4ddd25c5507950fea08005
Parents: f27e5bc
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 8 17:12:02 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 8 17:12:02 2015 -0600

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c511a7e5/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index 77845b0..7f075a1 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -268,7 +268,6 @@ public class LocalFsBlobStore extends BlobStore {
         SettableBlobMeta meta = getStoredBlobMeta(key);
         _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
         if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
-            zkClient.close();
             return 0;
         }
         replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();


[5/6] storm git commit: Merge branch 'zk-slowing-down' of https://github.com/redsanket/storm

Posted by kn...@apache.org.
Merge branch 'zk-slowing-down' of https://github.com/redsanket/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/27809a17
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/27809a17
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/27809a17

Branch: refs/heads/master
Commit: 27809a1794e62038723e08b373a55bc93f4744e6
Parents: f46fec3 fd83da8
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:08:11 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:08:11 2015 -0600

----------------------------------------------------------------------
 log4j2/worker.xml                               |  8 ++---
 .../storm/blobstore/BlobSynchronizer.java       |  2 +-
 .../storm/blobstore/KeySequenceNumber.java      |  2 +-
 .../storm/blobstore/LocalFsBlobStore.java       | 38 +++++++-------------
 .../test/clj/backtype/storm/nimbus_test.clj     |  5 +--
 .../backtype/storm/localizer/LocalizerTest.java |  4 +--
 6 files changed, 23 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/27809a17/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------


[6/6] storm git commit: Adding STORM-1376 to CHANGELOG

Posted by kn...@apache.org.
Adding STORM-1376 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cefc8430
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cefc8430
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cefc8430

Branch: refs/heads/master
Commit: cefc843099cc6f7cf6bfd89bb9135e0263a4c0d4
Parents: 27809a1
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:09:06 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:09:06 2015 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cefc8430/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c2d4a4b..0709964 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1376: Performance slowdown due excessive zk connections and log-debugging
  * STORM-1395: Move JUnit dependency to top-level pom
  * STORM-1372: Merging design and usage documents for distcache
  * STORM-1393: Update the storm.log.dir function, add doc for logs


[3/6] storm git commit: removed immediateFlush = true and added synchronization for potential race conditions between supervisor download and blobstore calls

Posted by kn...@apache.org.
removed immediateFlush = true and added synchronization for potential race conditions between supervisor download and blobstore calls


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a710bb66
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a710bb66
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a710bb66

Branch: refs/heads/master
Commit: a710bb6615c16cb2787ac460b00af33267e888bd
Parents: c511a7e
Author: Sanket <sc...@untilservice-lm>
Authored: Sun Dec 13 17:41:40 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Sun Dec 13 17:41:40 2015 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                                           | 2 +-
 log4j2/worker.xml                                            | 8 ++++----
 .../src/jvm/backtype/storm/blobstore/BlobSynchronizer.java   | 2 +-
 .../src/jvm/backtype/storm/blobstore/KeySequenceNumber.java  | 2 +-
 .../src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java   | 4 ++--
 5 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b7c4677..8d9e3a3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -179,7 +179,7 @@ task.refresh.poll.secs: 10
 task.credentials.poll.secs: 30
 
 # now should be null by default
-topology.backpressure.enable: true
+topology.backpressure.enable: false
 backpressure.disruptor.high.watermark: 0.9
 backpressure.disruptor.low.watermark: 0.4
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/log4j2/worker.xml
----------------------------------------------------------------------
diff --git a/log4j2/worker.xml b/log4j2/worker.xml
index df368c6..967585b 100644
--- a/log4j2/worker.xml
+++ b/log4j2/worker.xml
@@ -22,7 +22,7 @@
     <property name="patternNoTime">%msg%n</property>
 </properties>
 <appenders>
-    <RollingFile name="A1" immediateFlush="false"
+    <RollingFile name="A1"
 		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
 		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
         <PatternLayout>
@@ -33,7 +33,7 @@
         </Policies>
         <DefaultRolloverStrategy max="9"/>
     </RollingFile>
-    <RollingFile name="STDOUT" immediateFlush="false"
+    <RollingFile name="STDOUT"
 		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
 		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
         <PatternLayout>
@@ -44,7 +44,7 @@
         </Policies>
         <DefaultRolloverStrategy max="4"/>
     </RollingFile>
-    <RollingFile name="STDERR" immediateFlush="false"
+    <RollingFile name="STDERR"
 		fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
 		filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
         <PatternLayout>
@@ -58,7 +58,7 @@
     <Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
         protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
         facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
-        messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFlush="false" immediateFail="true"/>
+        messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true"/>
 </appenders>
 <loggers>
     <root level="info"> <!-- We log everything -->

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
index abd7c86..1f20d7c 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
@@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;;
+import java.util.Set;
 
 /**
  * Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
index 1cddac0..9307993 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
@@ -130,7 +130,7 @@ public class KeySequenceNumber {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public int getKeySequenceNumber(Map conf) {
+    public synchronized int getKeySequenceNumber(Map conf) {
         TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
         CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index 7f075a1..b8daad2 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -281,7 +281,7 @@ public class LocalFsBlobStore extends BlobStore {
     }
 
     //This additional check and download is for nimbus high availability in case you have more than one nimbus
-    public boolean checkForBlobOrDownload(String key) {
+    public synchronized boolean checkForBlobOrDownload(String key) {
         boolean checkBlobDownload = false;
         try {
             List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
@@ -301,7 +301,7 @@ public class LocalFsBlobStore extends BlobStore {
         return checkBlobDownload;
     }
 
-    public void checkForBlobUpdate(String key) {
+    public synchronized void checkForBlobUpdate(String key) {
         BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo);
     }