You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/09/04 17:22:43 UTC

[storm] branch 2.1.x-branch updated: STORM-1515: Fix LocalState Corruption

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch 2.1.x-branch
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/2.1.x-branch by this push:
     new 0e5a68e  STORM-1515: Fix LocalState Corruption
0e5a68e is described below

commit 0e5a68e9d2e8d5e79a1af36203f52801ba6d4864
Author: Tim Frison <ti...@invidi.com>
AuthorDate: Tue Aug 13 11:28:31 2019 -0600

    STORM-1515: Fix LocalState Corruption
    
    When a windows machine has a power failure, the local state file can
    become corrupted with repeated NUL characters. On restart, when the
    supervisor attempts to get the worker's heartbeat it will fail to
    deserialized the LocalStateData (because it is all NUL characters) and
    it will fail to start the workers.
---
 .../src/jvm/org/apache/storm/utils/LocalState.java |  5 +++
 .../test/java/org/apache/storm/LocalStateTest.java | 40 +++++++++++++++++-----
 2 files changed, 36 insertions(+), 9 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
index 6e1b44f..d5996d2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
@@ -32,6 +32,7 @@ import org.apache.storm.shade.org.apache.commons.io.FileUtils;
 import org.apache.storm.thrift.TBase;
 import org.apache.storm.thrift.TDeserializer;
 import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.protocol.TProtocolException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,6 +124,10 @@ public class LocalState {
             } catch (Exception e) {
                 attempts++;
                 if (attempts >= 10) {
+                    if (e.getCause() instanceof TProtocolException) {
+                        LOG.warn("LocalState file is corrupted, resetting state.", e);
+                        return new HashMap<>();
+                    }
                     throw new RuntimeException(e);
                 }
             }
diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
index 97765b2..2a14857 100644
--- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
+++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
@@ -57,15 +57,37 @@ public class LocalStateTest {
 
     @Test
     public void testEmptyState() throws IOException {
-        TmpPath tmp_dir = new TmpPath();
-        String dir = tmp_dir.getPath();
-        LocalState ls = new LocalState(dir, true);
-        GlobalStreamId gs_a = new GlobalStreamId("a", "a");
-        FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"));
-        FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version"));
-        Assert.assertNull(ls.get("c"));
-        ls.put("a", gs_a);
-        Assert.assertEquals(gs_a, ls.get("a"));
+        try (TmpPath tmp_dir = new TmpPath()) {
+            GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+            String dir = tmp_dir.getPath();
+            LocalState ls = new LocalState(dir, true);
+
+            FileUtils.touch(new File(dir, "12345"));
+            FileUtils.touch(new File(dir, "12345.version"));
+
+            Assert.assertNull(ls.get("c"));
+            ls.put("a", globalStreamId_a);
+            Assert.assertEquals(globalStreamId_a, ls.get("a"));
+        }
+    }
 
+    @Test
+    public void testAllNulState() throws IOException {
+        try (TmpPath tmp_dir = new TmpPath()) {
+            GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+            String dir = tmp_dir.getPath();
+            LocalState ls = new LocalState(dir, true);
+
+            FileUtils.touch(new File(dir, "12345.version"));
+
+            try (FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"))) {
+                Assert.assertNull(ls.get("c"));
+                data.write(new byte[100]);
+                ls.put("a", globalStreamId_a);
+                Assert.assertEquals(globalStreamId_a, ls.get("a"));
+            }
+        }
     }
 }