You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/09/04 17:21:53 UTC

[storm] branch master updated: STORM-1515: Fix LocalState Corruption

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new d212b59  STORM-1515: Fix LocalState Corruption
     new 23cd491  Merge pull request #3101 from frison/STORM-1515
d212b59 is described below

commit d212b59cd05d2c67a9b09d0c6a98d241f77824fb
Author: Tim Frison <ti...@invidi.com>
AuthorDate: Tue Aug 13 11:28:31 2019 -0600

    STORM-1515: Fix LocalState Corruption
    
    When a windows machine has a power failure, the local state file can
    become corrupted with repeated NUL characters. On restart, when the
    supervisor attempts to get the worker's heartbeat it will fail to
    deserialized the LocalStateData (because it is all NUL characters) and
    it will fail to start the workers.
---
 .../src/jvm/org/apache/storm/utils/LocalState.java |  5 +++
 .../test/java/org/apache/storm/LocalStateTest.java | 40 +++++++++++++++++-----
 2 files changed, 36 insertions(+), 9 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
index 6e1b44f..d5996d2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
@@ -32,6 +32,7 @@ import org.apache.storm.shade.org.apache.commons.io.FileUtils;
 import org.apache.storm.thrift.TBase;
 import org.apache.storm.thrift.TDeserializer;
 import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.protocol.TProtocolException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,6 +124,10 @@ public class LocalState {
             } catch (Exception e) {
                 attempts++;
                 if (attempts >= 10) {
+                    if (e.getCause() instanceof TProtocolException) {
+                        LOG.warn("LocalState file is corrupted, resetting state.", e);
+                        return new HashMap<>();
+                    }
                     throw new RuntimeException(e);
                 }
             }
diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
index 97765b2..2a14857 100644
--- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
+++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
@@ -57,15 +57,37 @@ public class LocalStateTest {
 
     @Test
     public void testEmptyState() throws IOException {
-        TmpPath tmp_dir = new TmpPath();
-        String dir = tmp_dir.getPath();
-        LocalState ls = new LocalState(dir, true);
-        GlobalStreamId gs_a = new GlobalStreamId("a", "a");
-        FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"));
-        FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version"));
-        Assert.assertNull(ls.get("c"));
-        ls.put("a", gs_a);
-        Assert.assertEquals(gs_a, ls.get("a"));
+        try (TmpPath tmp_dir = new TmpPath()) {
+            GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+            String dir = tmp_dir.getPath();
+            LocalState ls = new LocalState(dir, true);
+
+            FileUtils.touch(new File(dir, "12345"));
+            FileUtils.touch(new File(dir, "12345.version"));
+
+            Assert.assertNull(ls.get("c"));
+            ls.put("a", globalStreamId_a);
+            Assert.assertEquals(globalStreamId_a, ls.get("a"));
+        }
+    }
 
+    @Test
+    public void testAllNulState() throws IOException {
+        try (TmpPath tmp_dir = new TmpPath()) {
+            GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+            String dir = tmp_dir.getPath();
+            LocalState ls = new LocalState(dir, true);
+
+            FileUtils.touch(new File(dir, "12345.version"));
+
+            try (FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"))) {
+                Assert.assertNull(ls.get("c"));
+                data.write(new byte[100]);
+                ls.put("a", globalStreamId_a);
+                Assert.assertEquals(globalStreamId_a, ls.get("a"));
+            }
+        }
     }
 }