You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/12/17 22:09:20 UTC
[1/6] storm git commit: zk slowing down due to many connections
Repository: storm
Updated Branches:
refs/heads/master f46fec327 -> cefc84309
zk slowing down due to many connections
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f27e5bce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f27e5bce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f27e5bce
Branch: refs/heads/master
Commit: f27e5bce2c7fe2e06cef7746227a01099d7d32d3
Parents: 39163bf
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 8 16:58:28 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 8 16:58:28 2015 -0600
----------------------------------------------------------------------
.../storm/blobstore/LocalFsBlobStore.java | 35 +++++++-------------
.../test/clj/backtype/storm/nimbus_test.clj | 5 +--
.../backtype/storm/localizer/LocalizerTest.java | 4 +--
3 files changed, 16 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f27e5bce/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index ac7a4bd..77845b0 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -74,11 +74,13 @@ public class LocalFsBlobStore extends BlobStore {
private FileBlobStoreImpl fbs;
private final int allPermissions = READ | WRITE | ADMIN;
private Map conf;
+ private CuratorFramework zkClient;
@Override
public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
this.conf = conf;
this.nimbusInfo = nimbusInfo;
+ zkClient = BlobStoreUtils.createZKClient(conf);
if (overrideBase == null) {
overrideBase = (String)conf.get(Config.BLOBSTORE_DIR);
if (overrideBase == null) {
@@ -254,27 +256,22 @@ public class LocalFsBlobStore extends BlobStore {
@Override
public void shutdown() {
+ if (zkClient != null) {
+ zkClient.close();
+ }
}
@Override
public int getBlobReplication(String key, Subject who) throws Exception {
- CuratorFramework zkClient = null;
int replicationCount = 0;
- try {
- validateKey(key);
- SettableBlobMeta meta = getStoredBlobMeta(key);
- _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
- zkClient = BlobStoreUtils.createZKClient(conf);
- if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
- zkClient.close();
- return 0;
- }
- replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
- } finally {
- if (zkClient != null) {
- zkClient.close();
- }
+ validateKey(key);
+ SettableBlobMeta meta = getStoredBlobMeta(key);
+ _aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
+ if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
+ zkClient.close();
+ return 0;
}
+ replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
return replicationCount;
}
@@ -287,11 +284,9 @@ public class LocalFsBlobStore extends BlobStore {
//This additional check and download is for nimbus high availability in case you have more than one nimbus
public boolean checkForBlobOrDownload(String key) {
boolean checkBlobDownload = false;
- CuratorFramework zkClient = null;
try {
List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
if (!keyList.contains(key)) {
- zkClient = BlobStoreUtils.createZKClient(conf);
if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) != null) {
Set<NimbusInfo> nimbusSet = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, key);
if (BlobStoreUtils.downloadMissingBlob(conf, this, key, nimbusSet)) {
@@ -303,18 +298,12 @@ public class LocalFsBlobStore extends BlobStore {
}
} catch (Exception e) {
throw new RuntimeException(e);
- } finally {
- if (zkClient != null) {
- zkClient.close();
- }
}
return checkBlobDownload;
}
public void checkForBlobUpdate(String key) {
- CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo);
- zkClient.close();
}
public void fullCleanup(long age) throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/f27e5bce/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 0847883..4989254 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -1238,10 +1238,11 @@
(testing "nimbus-data uses correct ACLs"
(let [scheme "digest"
digest "storm:thisisapoorpassword"
- auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
+ auth-conf (merge (read-storm-config)
+ {STORM-ZOOKEEPER-AUTH-SCHEME scheme
STORM-ZOOKEEPER-AUTH-PAYLOAD digest
STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.DefaultPrincipalToLocal"
- NIMBUS-THRIFT-PORT 6666}
+ NIMBUS-THRIFT-PORT 6666})
expected-acls nimbus/NIMBUS-ZK-ACLS
fake-inimbus (reify INimbus (getForcedScheduler [this] nil))]
(stubbing [nimbus-topo-history-state nil
http://git-wip-us.apache.org/repos/asf/storm/blob/f27e5bce/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java b/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
index b47a3b4..dfd8294 100644
--- a/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
+++ b/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
@@ -517,11 +517,9 @@ public class LocalizerTest {
@Test(expected = KeyNotFoundException.class)
public void testKeyNotFoundException() throws Exception {
- Map conf = new HashMap();
+ Map conf = Utils.readStormConfig();
String key1 = "key1";
conf.put(Config.STORM_LOCAL_DIR, "local");
- conf.put(Config.BLOBSTORE_SUPERUSER, "superuser");
- conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "backtype.storm.security.auth.DefaultPrincipalToLocal");
LocalFsBlobStore bs = new LocalFsBlobStore();
LocalFsBlobStore spy = spy(bs);
Mockito.doReturn(true).when(spy).checkForBlobOrDownload(key1);
[4/6] storm git commit: revert backpressure=false to true
Posted by kn...@apache.org.
revert backpressure=false to true
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd83da80
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd83da80
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd83da80
Branch: refs/heads/master
Commit: fd83da80460c70db1eed127d2a977ff7b7d9110c
Parents: a710bb6
Author: Sanket <sc...@untilservice-lm>
Authored: Mon Dec 14 23:27:26 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Mon Dec 14 23:27:26 2015 -0600
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fd83da80/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8d9e3a3..b7c4677 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -179,7 +179,7 @@ task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
# now should be null by default
-topology.backpressure.enable: false
+topology.backpressure.enable: true
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4
[2/6] storm git commit: removing close method as shutdown does that
Posted by kn...@apache.org.
removing close method as shutdown does that
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c511a7e5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c511a7e5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c511a7e5
Branch: refs/heads/master
Commit: c511a7e5581c2aff5c4ddd25c5507950fea08005
Parents: f27e5bc
Author: Sanket <sc...@untilservice-lm>
Authored: Tue Dec 8 17:12:02 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Tue Dec 8 17:12:02 2015 -0600
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c511a7e5/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index 77845b0..7f075a1 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -268,7 +268,6 @@ public class LocalFsBlobStore extends BlobStore {
SettableBlobMeta meta = getStoredBlobMeta(key);
_aclHandler.hasPermissions(meta.get_acl(), READ, who, key);
if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + key) == null) {
- zkClient.close();
return 0;
}
replicationCount = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + key).size();
[5/6] storm git commit: Merge branch 'zk-slowing-down' of
https://github.com/redsanket/storm
Posted by kn...@apache.org.
Merge branch 'zk-slowing-down' of https://github.com/redsanket/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/27809a17
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/27809a17
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/27809a17
Branch: refs/heads/master
Commit: 27809a1794e62038723e08b373a55bc93f4744e6
Parents: f46fec3 fd83da8
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:08:11 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:08:11 2015 -0600
----------------------------------------------------------------------
log4j2/worker.xml | 8 ++---
.../storm/blobstore/BlobSynchronizer.java | 2 +-
.../storm/blobstore/KeySequenceNumber.java | 2 +-
.../storm/blobstore/LocalFsBlobStore.java | 38 +++++++-------------
.../test/clj/backtype/storm/nimbus_test.clj | 5 +--
.../backtype/storm/localizer/LocalizerTest.java | 4 +--
6 files changed, 23 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/27809a17/storm-core/test/jvm/backtype/storm/localizer/LocalizerTest.java
----------------------------------------------------------------------
[6/6] storm git commit: Adding STORM-1376 to CHANGELOG
Posted by kn...@apache.org.
Adding STORM-1376 to CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cefc8430
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cefc8430
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cefc8430
Branch: refs/heads/master
Commit: cefc843099cc6f7cf6bfd89bb9135e0263a4c0d4
Parents: 27809a1
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Thu Dec 17 15:09:06 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Thu Dec 17 15:09:06 2015 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cefc8430/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c2d4a4b..0709964 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1376: Performance slowdown due excessive zk connections and log-debugging
* STORM-1395: Move JUnit dependency to top-level pom
* STORM-1372: Merging design and usage documents for distcache
* STORM-1393: Update the storm.log.dir function, add doc for logs
[3/6] storm git commit: removed immediateFlush = true and added
synchronization for potential race conditions between supervisor download and
blobstore calls
Posted by kn...@apache.org.
removed immediateFlush = true and added synchronization for potential race conditions between supervisor download and blobstore calls
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a710bb66
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a710bb66
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a710bb66
Branch: refs/heads/master
Commit: a710bb6615c16cb2787ac460b00af33267e888bd
Parents: c511a7e
Author: Sanket <sc...@untilservice-lm>
Authored: Sun Dec 13 17:41:40 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Sun Dec 13 17:41:40 2015 -0600
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
log4j2/worker.xml | 8 ++++----
.../src/jvm/backtype/storm/blobstore/BlobSynchronizer.java | 2 +-
.../src/jvm/backtype/storm/blobstore/KeySequenceNumber.java | 2 +-
.../src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java | 4 ++--
5 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b7c4677..8d9e3a3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -179,7 +179,7 @@ task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
# now should be null by default
-topology.backpressure.enable: true
+topology.backpressure.enable: false
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/log4j2/worker.xml
----------------------------------------------------------------------
diff --git a/log4j2/worker.xml b/log4j2/worker.xml
index df368c6..967585b 100644
--- a/log4j2/worker.xml
+++ b/log4j2/worker.xml
@@ -22,7 +22,7 @@
<property name="patternNoTime">%msg%n</property>
</properties>
<appenders>
- <RollingFile name="A1" immediateFlush="false"
+ <RollingFile name="A1"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
<PatternLayout>
@@ -33,7 +33,7 @@
</Policies>
<DefaultRolloverStrategy max="9"/>
</RollingFile>
- <RollingFile name="STDOUT" immediateFlush="false"
+ <RollingFile name="STDOUT"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
<PatternLayout>
@@ -44,7 +44,7 @@
</Policies>
<DefaultRolloverStrategy max="4"/>
</RollingFile>
- <RollingFile name="STDERR" immediateFlush="false"
+ <RollingFile name="STDERR"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
<PatternLayout>
@@ -58,7 +58,7 @@
<Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
- messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFlush="false" immediateFail="true"/>
+ messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true"/>
</appenders>
<loggers>
<root level="info"> <!-- We log everything -->
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
index abd7c86..1f20d7c 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
@@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
-import java.util.Set;;
+import java.util.Set;
/**
* Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
index 1cddac0..9307993 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
@@ -130,7 +130,7 @@ public class KeySequenceNumber {
this.nimbusInfo = nimbusInfo;
}
- public int getKeySequenceNumber(Map conf) {
+ public synchronized int getKeySequenceNumber(Map conf) {
TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
try {
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index 7f075a1..b8daad2 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -281,7 +281,7 @@ public class LocalFsBlobStore extends BlobStore {
}
//This additional check and download is for nimbus high availability in case you have more than one nimbus
- public boolean checkForBlobOrDownload(String key) {
+ public synchronized boolean checkForBlobOrDownload(String key) {
boolean checkBlobDownload = false;
try {
List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
@@ -301,7 +301,7 @@ public class LocalFsBlobStore extends BlobStore {
return checkBlobDownload;
}
- public void checkForBlobUpdate(String key) {
+ public synchronized void checkForBlobUpdate(String key) {
BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo);
}