You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/12/17 22:09:22 UTC
[3/6] storm git commit: removed immediateFlush = true and added
synchronization for potential race conditions between supervisor download and
blobstore calls
removed immediateFlush = true and added synchronization for potential race conditions between supervisor download and blobstore calls
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a710bb66
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a710bb66
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a710bb66
Branch: refs/heads/master
Commit: a710bb6615c16cb2787ac460b00af33267e888bd
Parents: c511a7e
Author: Sanket <sc...@untilservice-lm>
Authored: Sun Dec 13 17:41:40 2015 -0600
Committer: Sanket <sc...@untilservice-lm>
Committed: Sun Dec 13 17:41:40 2015 -0600
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
log4j2/worker.xml | 8 ++++----
.../src/jvm/backtype/storm/blobstore/BlobSynchronizer.java | 2 +-
.../src/jvm/backtype/storm/blobstore/KeySequenceNumber.java | 2 +-
.../src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java | 4 ++--
5 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b7c4677..8d9e3a3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -179,7 +179,7 @@ task.refresh.poll.secs: 10
task.credentials.poll.secs: 30
# now should be null by default
-topology.backpressure.enable: true
+topology.backpressure.enable: false
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/log4j2/worker.xml
----------------------------------------------------------------------
diff --git a/log4j2/worker.xml b/log4j2/worker.xml
index df368c6..967585b 100644
--- a/log4j2/worker.xml
+++ b/log4j2/worker.xml
@@ -22,7 +22,7 @@
<property name="patternNoTime">%msg%n</property>
</properties>
<appenders>
- <RollingFile name="A1" immediateFlush="false"
+ <RollingFile name="A1"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz">
<PatternLayout>
@@ -33,7 +33,7 @@
</Policies>
<DefaultRolloverStrategy max="9"/>
</RollingFile>
- <RollingFile name="STDOUT" immediateFlush="false"
+ <RollingFile name="STDOUT"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz">
<PatternLayout>
@@ -44,7 +44,7 @@
</Policies>
<DefaultRolloverStrategy max="4"/>
</RollingFile>
- <RollingFile name="STDERR" immediateFlush="false"
+ <RollingFile name="STDERR"
fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"
filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz">
<PatternLayout>
@@ -58,7 +58,7 @@
<Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"
facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
- messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFlush="false" immediateFail="true"/>
+ messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true"/>
</appenders>
<loggers>
<root level="info"> <!-- We log everything -->
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
index abd7c86..1f20d7c 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobSynchronizer.java
@@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
-import java.util.Set;;
+import java.util.Set;
/**
* Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
index 1cddac0..9307993 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/KeySequenceNumber.java
@@ -130,7 +130,7 @@ public class KeySequenceNumber {
this.nimbusInfo = nimbusInfo;
}
- public int getKeySequenceNumber(Map conf) {
+ public synchronized int getKeySequenceNumber(Map conf) {
TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
try {
http://git-wip-us.apache.org/repos/asf/storm/blob/a710bb66/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
index 7f075a1..b8daad2 100644
--- a/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/backtype/storm/blobstore/LocalFsBlobStore.java
@@ -281,7 +281,7 @@ public class LocalFsBlobStore extends BlobStore {
}
//This additional check and download is for nimbus high availability in case you have more than one nimbus
- public boolean checkForBlobOrDownload(String key) {
+ public synchronized boolean checkForBlobOrDownload(String key) {
boolean checkBlobDownload = false;
try {
List<String> keyList = BlobStoreUtils.getKeyListFromBlobStore(this);
@@ -301,7 +301,7 @@ public class LocalFsBlobStore extends BlobStore {
return checkBlobDownload;
}
- public void checkForBlobUpdate(String key) {
+ public synchronized void checkForBlobUpdate(String key) {
BlobStoreUtils.updateKeyForBlobStore(conf, this, zkClient, key, nimbusInfo);
}