You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/09/04 17:21:53 UTC
[storm] branch master updated: STORM-1515: Fix LocalState Corruption
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new d212b59 STORM-1515: Fix LocalState Corruption
new 23cd491 Merge pull request #3101 from frison/STORM-1515
d212b59 is described below
commit d212b59cd05d2c67a9b09d0c6a98d241f77824fb
Author: Tim Frison <ti...@invidi.com>
AuthorDate: Tue Aug 13 11:28:31 2019 -0600
STORM-1515: Fix LocalState Corruption
When a windows machine has a power failure, the local state file can
become corrupted with repeated NUL characters. On restart, when the
supervisor attempts to get the worker's heartbeat it will fail to
deserialized the LocalStateData (because it is all NUL characters) and
it will fail to start the workers.
---
.../src/jvm/org/apache/storm/utils/LocalState.java | 5 +++
.../test/java/org/apache/storm/LocalStateTest.java | 40 +++++++++++++++++-----
2 files changed, 36 insertions(+), 9 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
index 6e1b44f..d5996d2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
@@ -32,6 +32,7 @@ import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.thrift.TBase;
import org.apache.storm.thrift.TDeserializer;
import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.protocol.TProtocolException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,6 +124,10 @@ public class LocalState {
} catch (Exception e) {
attempts++;
if (attempts >= 10) {
+ if (e.getCause() instanceof TProtocolException) {
+ LOG.warn("LocalState file is corrupted, resetting state.", e);
+ return new HashMap<>();
+ }
throw new RuntimeException(e);
}
}
diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
index 97765b2..2a14857 100644
--- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
+++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
@@ -57,15 +57,37 @@ public class LocalStateTest {
@Test
public void testEmptyState() throws IOException {
- TmpPath tmp_dir = new TmpPath();
- String dir = tmp_dir.getPath();
- LocalState ls = new LocalState(dir, true);
- GlobalStreamId gs_a = new GlobalStreamId("a", "a");
- FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"));
- FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version"));
- Assert.assertNull(ls.get("c"));
- ls.put("a", gs_a);
- Assert.assertEquals(gs_a, ls.get("a"));
+ try (TmpPath tmp_dir = new TmpPath()) {
+ GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+ String dir = tmp_dir.getPath();
+ LocalState ls = new LocalState(dir, true);
+
+ FileUtils.touch(new File(dir, "12345"));
+ FileUtils.touch(new File(dir, "12345.version"));
+
+ Assert.assertNull(ls.get("c"));
+ ls.put("a", globalStreamId_a);
+ Assert.assertEquals(globalStreamId_a, ls.get("a"));
+ }
+ }
+ @Test
+ public void testAllNulState() throws IOException {
+ try (TmpPath tmp_dir = new TmpPath()) {
+ GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+ String dir = tmp_dir.getPath();
+ LocalState ls = new LocalState(dir, true);
+
+ FileUtils.touch(new File(dir, "12345.version"));
+
+ try (FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"))) {
+ Assert.assertNull(ls.get("c"));
+ data.write(new byte[100]);
+ ls.put("a", globalStreamId_a);
+ Assert.assertEquals(globalStreamId_a, ls.get("a"));
+ }
+ }
}
}