You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/05/05 14:14:08 UTC

[storm] 01/04: STORM-3376: Set Server callback before opening Netty socket, so we don't drop messages during startup

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit aaf1113360ce151d86948860c2f290befa79abb8
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Fri Apr 12 20:25:42 2019 +0200

    STORM-3376: Set Server callback before opening Netty socket, so we don't drop messages during startup
---
 integration-test/config/storm.yaml                 |  6 ++
 .../test/org/apache/storm/sql/TestStormSql.java    | 45 +++++-----
 .../backends/streams/TestPlanCompiler.java         | 96 ++++++++++-----------
 .../src/test/org/apache/storm/sql/TestUtils.java   | 23 ++---
 .../org/apache/storm/cluster/IStateStorage.java    |  4 +-
 .../storm/cluster/StormClusterStateImpl.java       |  8 +-
 .../jvm/org/apache/storm/daemon/worker/Worker.java |  4 +-
 .../apache/storm/daemon/worker/WorkerState.java    | 40 ++++-----
 .../org/apache/storm/messaging/IConnection.java    | 14 ----
 .../jvm/org/apache/storm/messaging/IContext.java   |  5 +-
 .../org/apache/storm/messaging/local/Context.java  | 98 ++++++++++------------
 .../org/apache/storm/messaging/netty/Client.java   | 15 ----
 .../org/apache/storm/messaging/netty/Context.java  |  6 +-
 .../org/apache/storm/messaging/netty/Server.java   | 29 +++----
 .../apache/storm/security/auth/ThriftServer.java   |  8 +-
 .../auth/digest/DigestSaslTransportPlugin.java     | 11 ++-
 .../auth/kerberos/KerberosSaslTransportPlugin.java | 11 ++-
 .../security/auth/sasl/SaslTransportPlugin.java    | 11 ++-
 .../auth/workertoken/WorkerTokenAuthorizer.java    | 13 ++-
 .../apache/storm/messaging/netty/NettyTest.java    | 28 +++----
 .../main/java/org/apache/storm/LocalCluster.java   |  6 +-
 .../apache/storm/blobstore/LocalFsBlobStore.java   |  1 +
 .../org/apache/storm/daemon/supervisor/Slot.java   |  2 +-
 .../org/apache/storm/localizer/AsyncLocalizer.java |  4 +-
 .../security/auth/workertoken/WorkerTokenTest.java | 24 +++---
 25 files changed, 247 insertions(+), 265 deletions(-)

diff --git a/integration-test/config/storm.yaml b/integration-test/config/storm.yaml
index 67784e7..774497f 100644
--- a/integration-test/config/storm.yaml
+++ b/integration-test/config/storm.yaml
@@ -35,3 +35,9 @@ drpc.servers:
   - "node1"
 
 supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 6709]
+
+# Enable assertions
+nimbus.childopts: "-Xmx1024m -ea"
+supervisor.childopts: "-Xmx256m -ea"
+worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -ea"
+ui.childopts: "-Xmx768m -ea"
\ No newline at end of file
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 5a30764..3958894 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -29,28 +29,24 @@ import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(TestUtils.MockBoltExtension.class)
+@ExtendWith(TestUtils.MockInsertBoltExtension.class)
 public class TestStormSql {
 
     public static final int WAIT_TIMEOUT_MS = 1000 * 1000;
     public static final int WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED = 1000 * 10;
     public static final int WAIT_TIMEOUT_MS_ERROR_EXPECTED = 1000;
 
-    @Rule
-    public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
-
-    @Rule
-    public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
-
     private static LocalCluster cluster;
 
-    @BeforeClass
+    @BeforeAll
     public static void staticSetup() throws Exception {
         DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
         DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
@@ -61,7 +57,7 @@ public class TestStormSql {
         cluster = new LocalCluster();
     }
 
-    @AfterClass
+    @AfterAll
     public static void staticCleanup() {
         DataSourcesRegistry.providerMap().remove("mock");
         DataSourcesRegistry.providerMap().remove("mocknested");
@@ -176,7 +172,7 @@ public class TestStormSql {
         Assert.assertEquals(0, values.size());
     }
 
-    @Test(expected = ValidationException.class)
+    @Test
     public void testExternalUdfType() throws Exception {
         List<String> stmt = new ArrayList<>();
         stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
@@ -185,12 +181,11 @@ public class TestStormSql {
         stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
-        Assert.fail("Should raise ValidationException.");
+        Assertions.assertThrows(ValidationException.class,
+            () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
     }
 
-    @Test(expected = CompilingClassLoader.CompilerException.class)
+    @Test
     public void testExternalUdfType2() throws Exception {
         List<String> stmt = new ArrayList<>();
         // generated code will be not compilable since return type of MYPLUS and type of 'x' are different
@@ -200,9 +195,8 @@ public class TestStormSql {
         stmt.add("INSERT INTO BAR SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
-        Assert.fail("Should raise CompilerException.");
+        Assertions.assertThrows(CompilingClassLoader.CompilerException.class,
+            () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
     }
 
     @Test
@@ -222,7 +216,7 @@ public class TestStormSql {
         Assert.assertEquals(5, values.get(1).getFirst());
     }
 
-    @Test(expected = UnsupportedOperationException.class)
+    @Test
     public void testExternalUdfUsingJar() throws Exception {
         List<String> stmt = new ArrayList<>();
         stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
@@ -231,9 +225,8 @@ public class TestStormSql {
         stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
-        Assert.fail("Should raise UnsupportedOperationException.");
+        Assertions.assertThrows(UnsupportedOperationException.class,
+            () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
     }
 
     private static class MockDataSourceProvider implements DataSourcesProvider {
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
index 68203d8..21bc6f3 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
@@ -33,35 +33,31 @@ import org.apache.storm.sql.planner.streams.QueryPlanner;
 import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
 
 import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(TestUtils.MockBoltExtension.class)
+@ExtendWith(TestUtils.MockInsertBoltExtension.class)
 public class TestPlanCompiler {
-    private static LocalCluster cluster;
 
-    @Rule
-    public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
-
-    @Rule
-    public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
+    private static LocalCluster cluster;
 
-    @BeforeClass
+    @BeforeAll
     public static void staticSetup() throws Exception {
         cluster = new LocalCluster();
     }
 
-    @AfterClass
+    @AfterAll
     public static void staticCleanup() {
-        if (cluster!= null) {
+        if (cluster != null) {
             cluster.shutdown();
             cluster = null;
         }
@@ -82,7 +78,7 @@ public class TestPlanCompiler {
         final StormTopology topo = proc.build();
 
         SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
-        Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
+        Assert.assertArrayEquals(new Values[]{new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
     }
 
     @Test
@@ -99,15 +95,15 @@ public class TestPlanCompiler {
         final StormTopology topo = proc.build();
 
         SqlTestUtil.runStormTopology(cluster, TestUtils.MockInsertBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
-        Assert.assertArrayEquals(new Pair[] { Pair.of(4, new Values(4, "abcde", "y")) }, TestUtils.MockInsertBolt.getCollectedValues().toArray());
+        Assert.assertArrayEquals(new Pair[]{Pair.of(4, new Values(4, "abcde", "y"))}, TestUtils.MockInsertBolt.getCollectedValues().toArray());
     }
 
     @Test
     public void testUdf() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT MYPLUS(ID, 3)" +
-                "FROM FOO " +
-                "WHERE ID = 2";
+        String sql = "SELECT MYPLUS(ID, 3)"
+            + "FROM FOO "
+            + "WHERE ID = 2";
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
         Map<String, ISqlStreamsDataSource> data = new HashMap<>();
         data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
@@ -119,15 +115,15 @@ public class TestPlanCompiler {
         final StormTopology topo = proc.build();
 
         SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
-        Assert.assertArrayEquals(new Values[] { new Values(5) }, TestUtils.MockBolt.getCollectedValues().toArray());
+        Assert.assertArrayEquals(new Values[]{new Values(5)}, TestUtils.MockBolt.getCollectedValues().toArray());
     }
 
     @Test
     public void testNested() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
-                "FROM FOO " +
-                "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+        String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD "
+            + "FROM FOO "
+            + "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
 
         final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -144,23 +140,23 @@ public class TestPlanCompiler {
         Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
         Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
         Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))},
-                TestUtils.MockBolt.getCollectedValues().toArray());
+            TestUtils.MockBolt.getCollectedValues().toArray());
     }
 
     /**
-     * All the binary literal tests are done here, because Avatica converts the result to byte[]
-     * whereas Stream provides the result to ByteString which makes different semantic from Stream implementation.
+     * All the binary literal tests are done here, because Avatica converts the result to byte[] whereas Stream provides the result to
+     * ByteString which makes different semantic from Stream implementation.
      */
     @Test
     public void testBinaryStringFunctions() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT x'45F0AB' || x'45F0AB', " +
-                "POSITION(x'F0' IN x'453423F0ABBC'), " +
-                "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), " +
-                "SUBSTRING(x'453423F0ABBC' FROM 3), " +
-                "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) " +
-                "FROM FOO " +
-                "WHERE ID > 0 AND ID < 2";
+        String sql = "SELECT x'45F0AB' || x'45F0AB', "
+            + "POSITION(x'F0' IN x'453423F0ABBC'), "
+            + "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), "
+            + "SUBSTRING(x'453423F0ABBC' FROM 3), "
+            + "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) "
+            + "FROM FOO "
+            + "WHERE ID > 0 AND ID < 2";
 
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
         final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -190,19 +186,19 @@ public class TestPlanCompiler {
     @Test
     public void testDateKeywordsAndFunctions() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT " +
-                "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, " +
-                "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, " +
-                "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')," +
-                "FLOOR(DATE '2016-01-23' TO MONTH)," +
-                "CEIL(TIME '12:34:56' TO MINUTE)," +
-                "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP," +
-                "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}," +
-                "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}," +
-                "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, " +
-                "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field "   +
-                "FROM FOO " +
-                "WHERE ID > 0 AND ID < 2";
+        String sql = "SELECT "
+            + "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, "
+            + "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, "
+            + "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56'),"
+            + "FLOOR(DATE '2016-01-23' TO MONTH),"
+            + "CEIL(TIME '12:34:56' TO MINUTE),"
+            + "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP,"
+            + "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')},"
+            + "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')},"
+            + "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, "
+            + "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field "
+            + "FROM FOO "
+            + "WHERE ID > 0 AND ID < 2";
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
 
         final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -229,7 +225,7 @@ public class TestPlanCompiler {
         int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY);
 
         Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt,
-                        134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
-                TestUtils.MockBolt.getCollectedValues().toArray());
+            134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
+            TestUtils.MockBolt.getCollectedValues().toArray());
     }
-}
\ No newline at end of file
+}
diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index f728bf6..a1aa552 100644
--- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -39,22 +39,23 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.junit.rules.ExternalResource;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
 
 public class TestUtils {
-  public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() {
-    @Override
-    protected void before() throws Throwable {
-      MockInsertBolt.getCollectedValues().clear();
+    public static final class MockInsertBoltExtension implements BeforeEachCallback {
+        @Override
+        public void beforeEach(ExtensionContext ctx) throws Exception {
+            MockInsertBolt.getCollectedValues().clear();
+        }
     }
-  };
 
-  public static final ExternalResource mockBoltValueResource = new ExternalResource() {
-    @Override
-    protected void before() throws Throwable {
-      MockBolt.getCollectedValues().clear();
+    public static final class MockBoltExtension implements BeforeEachCallback {
+        @Override
+        public void beforeEach(ExtensionContext arg0) throws Exception {
+            MockBolt.getCollectedValues().clear();
+        }
     }
-  };
 
   public static class MyPlus {
     public static Integer evaluate(Integer x, Integer y) {
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
index 0889a26..b673932 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.cluster;
 
+import java.io.Closeable;
 import java.util.List;
 import org.apache.storm.callback.ZKStateChangedCallback;
 import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
@@ -27,7 +28,7 @@ import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
  * For example, performing these two calls: set_data("/path", data, acls); void set_worker_hb("/path", heartbeat, acls); may or may not
  * cause a collision in "/path". Never use the same paths with the *_hb* methods as you do with the others.
  */
-public interface IStateStorage {
+public interface IStateStorage extends Closeable {
 
     /**
      * Registers a callback function that gets called when CuratorEvents happen.
@@ -115,6 +116,7 @@ public interface IStateStorage {
     /**
      * Close the connection to the data store.
      */
+    @Override
     void close();
 
     /**
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 644f465..f330278 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -56,7 +56,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
     private final List<ACL> defaultAcls;
     private final String stateId;
-    private final boolean solo;
+    private final boolean shouldCloseStateStorageOnDisconnect;
     private final ClusterStateContext context;
     private IStateStorage stateStorage;
     private ILocalAssignmentsBackend assignmentsBackend;
@@ -74,10 +74,10 @@ public class StormClusterStateImpl implements IStormClusterState {
     private ConcurrentHashMap<String, Runnable> logConfigCallback;
 
     public StormClusterStateImpl(IStateStorage StateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend,
-                                 ClusterStateContext context, boolean solo) throws Exception {
+                                 ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception {
 
         this.stateStorage = StateStorage;
-        this.solo = solo;
+        this.shouldCloseStateStorageOnDisconnect = shouldCloseStateStorageOnDisconnect;
         this.defaultAcls = context.getDefaultZkAcls();
         this.context = context;
         this.assignmentsBackend = assignmentsassignmentsBackend;
@@ -831,7 +831,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     @Override
     public void disconnect() {
         stateStorage.unregister(stateId);
-        if (solo) {
+        if (shouldCloseStateStorageOnDisconnect) {
             stateStorage.close();
             this.assignmentsBackend.close();
         }
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 05a79c6..e950c051 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -197,9 +197,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
                                Worker.this::doExecutorHeartbeats);
 
-        workerState.registerCallbacks();
-
-        workerState.refreshConnections(null);
+        workerState.refreshConnections();
 
         workerState.activateWorkerWhenAllConnectionsReady();
 
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 0c13bec..2aa96a9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.StormTimer;
@@ -57,6 +58,7 @@ import org.apache.storm.hooks.IWorkerHook;
 import org.apache.storm.messaging.ConnectionWithStatus;
 import org.apache.storm.messaging.DeserializingConnectionCallback;
 import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IConnectionCallback;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.messaging.TransportFactory;
 import org.apache.storm.messaging.netty.BackPressureStatus;
@@ -158,7 +160,6 @@ public class WorkerState {
         this.conf = conf;
         this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
         this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
-        this.receiver = this.mqContext.bind(topologyId, port);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
         this.supervisorPort = supervisorPort;
@@ -215,6 +216,16 @@ public class WorkerState {
         this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);
         this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue);
         this.deserializedWorkerHooks = deserializeWorkerHooks();
+        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
+        IConnectionCallback cb = new DeserializingConnectionCallback(topologyConf,
+            getWorkerTopologyContext(),
+            this::transferLocalBatch);
+        Supplier<Object> newConnectionResponse = () -> {
+            BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+            LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
+            return bpStatus;
+        };
+        this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse);
     }
 
     private static double getQueueLoad(JCQueue q) {
@@ -356,19 +367,11 @@ public class WorkerState {
         return userTimer;
     }
 
-    public void refreshConnections() {
-        try {
-            refreshConnections(() -> refreshConnectionsTimer.schedule(0, this::refreshConnections));
-        } catch (Exception e) {
-            throw Utils.wrapInRuntime(e);
-        }
-    }
-
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
 
-    public void refreshConnections(Runnable callback) throws Exception {
+    public void refreshConnections() {
         Assignment assignment = getLocalAssignment(conf, stormClusterState, topologyId);
 
         Set<NodeInfo> neededConnections = new HashSet<>();
@@ -497,21 +500,6 @@ public class WorkerState {
         );
     }
 
-    public void registerCallbacks() {
-        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
-        receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
-                                                                  getWorkerTopologyContext(),
-                                                                  this::transferLocalBatch));
-        // Send curr BackPressure status to new clients
-        receiver.registerNewConnectionResponse(
-            () -> {
-                BackPressureStatus bpStatus = bpTracker.getCurrStatus();
-                LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
-                return bpStatus;
-            }
-        );
-    }
-
     /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */
     public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
         return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
@@ -576,7 +564,7 @@ public class WorkerState {
         }
     }
 
-    public WorkerTopologyContext getWorkerTopologyContext() {
+    public final WorkerTopologyContext getWorkerTopologyContext() {
         try {
             String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
             String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index c2e156c..f713c7f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -22,20 +22,6 @@ import org.apache.storm.messaging.netty.BackPressureStatus;
 public interface IConnection extends AutoCloseable {
 
     /**
-     * Register a callback to be notified when data is ready to be processed.
-     *
-     * @param cb the callback to process the messages.
-     */
-    void registerRecv(IConnectionCallback cb);
-
-    /**
-     * Register a response generator to be used to send an initial response when a new client connects.
-     *
-     * @param cb the callback to process the connection.
-     */
-    void registerNewConnectionResponse(Supplier<Object> cb);
-
-    /**
      * Send load metrics to all downstream connections.
      *
      * @param taskToLoad a map from the task id to the load for that task.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index 8d2c0dc..057ae30 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -14,6 +14,7 @@ package org.apache.storm.messaging;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 /**
  * This interface needs to be implemented for messaging plugin.
@@ -41,9 +42,11 @@ public interface IContext {
      *
      * @param storm_id topology ID
      * @param port     port #
+     * @param cb The callback to deliver received messages to
+     * @param newConnectionResponse Supplier of the initial message to send to new client connections
      * @return server side connection
      */
-    IConnection bind(String storm_id, int port);
+    IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse);
 
     /**
      * This method establish a client side connection to a remote server
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index 6071cbe..2737dfb 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -15,6 +15,7 @@ package org.apache.storm.messaging.local;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -37,36 +38,36 @@ import org.slf4j.LoggerFactory;
 
 public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
-    private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
-
-    private static LocalServer getLocalServer(String nodeId, int port) {
-        String key = nodeId + "-" + port;
-        LocalServer ret = _registry.get(key);
-        if (ret == null) {
-            ret = new LocalServer(port);
-            LocalServer tmp = _registry.putIfAbsent(key, ret);
-            if (tmp != null) {
-                ret = tmp;
-            }
+    private final ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
+
+    private static String getNodeKey(String nodeId, int port) {
+        return nodeId + "-" + port;
+    }
+    
+    private LocalServer createLocalServer(String nodeId, int port, IConnectionCallback cb) {
+        String key = getNodeKey(nodeId, port);
+        LocalServer ret = new LocalServer(port, cb);
+        LocalServer existing = _registry.put(key, ret);
+        if (existing != null) {
+            //Can happen if worker is restarted in the same topology, e.g. due to blob update
+            LOG.info("Replacing existing server for key {}", existing, ret, key);
         }
         return ret;
     }
 
-    ;
-
     @Override
     public void prepare(Map<String, Object> topoConf) {
         //NOOP
     }
 
     @Override
-    public IConnection bind(String storm_id, int port) {
-        return getLocalServer(storm_id, port);
+    public IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
+        return createLocalServer(storm_id, port, cb);
     }
 
     @Override
     public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
-        return new LocalClient(getLocalServer(storm_id, port));
+        return new LocalClient(storm_id, port);
     }
 
     @Override
@@ -74,25 +75,16 @@ public class Context implements IContext {
         //NOOP
     }
 
-    private static class LocalServer implements IConnection {
+    private class LocalServer implements IConnection {
         final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
         final int port;
-        volatile IConnectionCallback _cb;
+        final IConnectionCallback _cb;
 
-        public LocalServer(int port) {
+        public LocalServer(int port, IConnectionCallback cb) {
             this.port = port;
+            this._cb = cb;
         }
-
-        @Override
-        public void registerRecv(IConnectionCallback cb) {
-            _cb = cb;
-        }
-
-        @Override
-        public void registerNewConnectionResponse(Supplier<Object> cb) {
-            return;
-        }
-
+        
         @Override
         public void send(Iterator<TaskMessage> msgs) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
@@ -131,14 +123,16 @@ public class Context implements IContext {
         }
     }
 
-    private static class LocalClient implements IConnection {
-        private final LocalServer _server;
+    private class LocalClient implements IConnection {
         //Messages sent before the server registered a callback
         private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
         private final ScheduledExecutorService _pendingFlusher;
+        private final int port;
+        private final String registryKey;
 
-        public LocalClient(LocalServer server) {
-            _server = server;
+        public LocalClient(String stormId, int port) {
+            this.port = port;
+            this.registryKey = getNodeKey(stormId, port);
             _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
             _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                 @Override
@@ -163,35 +157,26 @@ public class Context implements IContext {
             }, 5, 5, TimeUnit.SECONDS);
         }
 
-        @Override
-        public void registerRecv(IConnectionCallback cb) {
-            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
-        }
-
-        @Override
-        public void registerNewConnectionResponse(Supplier<Object> cb) {
-            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
-        }
-
         private void flushPending() {
-            IConnectionCallback serverCb = _server._cb;
-            if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
+            //Can't cache server in client, server can change when workers restart.
+            LocalServer server = _registry.get(registryKey);
+            if (server != null && !_pendingDueToUnregisteredServer.isEmpty()) {
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 _pendingDueToUnregisteredServer.drainTo(ret);
-                serverCb.recv(ret);
+                server._cb.recv(ret);
             }
         }
 
         @Override
         public void send(Iterator<TaskMessage> msgs) {
-            IConnectionCallback serverCb = _server._cb;
-            if (serverCb != null) {
+            LocalServer server = _registry.get(registryKey);
+            if (server != null) {
                 flushPending();
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 while (msgs.hasNext()) {
                     ret.add(msgs.next());
                 }
-                serverCb.recv(ret);
+                server._cb.recv(ret);
             } else {
                 while (msgs.hasNext()) {
                     _pendingDueToUnregisteredServer.add(msgs.next());
@@ -201,12 +186,19 @@ public class Context implements IContext {
 
         @Override
         public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
-            return _server.getLoad(tasks);
+            LocalServer server = _registry.get(registryKey);
+            if (server != null) {
+                return server.getLoad(tasks);
+            }
+            return Collections.emptyMap();
         }
 
         @Override
         public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
-            _server.sendLoadMetrics(taskToLoad);
+            LocalServer server = _registry.get(registryKey);
+            if (server != null) {
+                server.sendLoadMetrics(taskToLoad);
+            }
         }
 
         @Override
@@ -216,7 +208,7 @@ public class Context implements IContext {
 
         @Override
         public int getPort() {
-            return _server.getPort();
+            return port;
         }
 
         @Override
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index fe2fe16..61a9c99 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -236,21 +236,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         }
     }
 
-    /**
-     * Receiving messages is not supported by a client.
-     *
-     * @throws java.lang.UnsupportedOperationException whenever this method is being called.
-     */
-    @Override
-    public void registerRecv(IConnectionCallback cb) {
-        throw new UnsupportedOperationException("Client connection should not receive any messages");
-    }
-
-    @Override
-    public void registerNewConnectionResponse(Supplier<Object> cb) {
-        throw new UnsupportedOperationException("Client does not accept new connections");
-    }
-
     @Override
     public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
         throw new RuntimeException("Client connection should not send load metrics");
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 27ccd04..ca46c4f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -17,8 +17,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import org.apache.storm.Config;
 import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IConnectionCallback;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
 import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
@@ -53,8 +55,8 @@ public class Context implements IContext {
      * establish a server with a binding port
      */
     @Override
-    public synchronized IConnection bind(String storm_id, int port) {
-        Server server = new Server(topoConf, port);
+    public synchronized IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
+        Server server = new Server(topoConf, port, cb, newConnectionResponse);
         serverConnections.add(server);
         return server;
     }
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 7d150c3..a3cd8b0 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -62,14 +62,23 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     private final int port;
     private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
     private final KryoValuesSerializer ser;
+    private final IConnectionCallback cb;
+    private final Supplier<Object> newConnectionResponse;
     private volatile boolean closing = false;
-    private IConnectionCallback cb = null;
-    private Supplier<Object> newConnectionResponse;
 
-    Server(Map<String, Object> topoConf, int port) {
+    /**
+     * Starts Netty at the given port
+     * @param topoConf The topology config
+     * @param port The port to start Netty at
+     * @param cb The callback to deliver incoming messages to
+     * @param newConnectionResponse The response to send to clients when they connect. Can be null.
+     */
+    Server(Map<String, Object> topoConf, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
         this.topoConf = topoConf;
         this.port = port;
         ser = new KryoValuesSerializer(topoConf);
+        this.cb = cb;
+        this.newConnectionResponse = newConnectionResponse;
 
         // Configure the server.
         int buffer_size = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -136,19 +145,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
             return;
         }
         addReceiveCount(from, msgs.size());
-        if (cb != null) {
-            cb.recv(msgs);
-        }
-    }
-
-    @Override
-    public void registerRecv(IConnectionCallback cb) {
-        this.cb = cb;
-    }
-
-    @Override
-    public void registerNewConnectionResponse(Supplier<Object> newConnectionResponse) {
-        this.newConnectionResponse = newConnectionResponse;
+        cb.recv(msgs);
     }
 
     @Override
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index eaeb17a..e83d37d 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -12,9 +12,11 @@
 
 package org.apache.storm.security.auth;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import javax.security.auth.login.Configuration;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
 import org.apache.storm.thrift.TProcessor;
 import org.apache.storm.thrift.server.TServer;
 import org.apache.storm.thrift.transport.TTransportException;
@@ -30,6 +32,7 @@ public class ThriftServer {
     private Configuration loginConf;
     private int port;
     private boolean areWorkerTokensSupported;
+    private ITransportPlugin transportPlugin;
 
     public ThriftServer(Map<String, Object> conf, TProcessor processor, ThriftConnectionType type) {
         this.conf = conf;
@@ -44,7 +47,7 @@ public class ThriftServer {
         }
         try {
             //locate our thrift transport plugin
-            ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
+            transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
             //server
             server = transportPlugin.getServer(this.processor);
             port = transportPlugin.getPort();
@@ -57,6 +60,9 @@ public class ThriftServer {
 
     public void stop() {
         server.stop();
+        if (transportPlugin instanceof SaslTransportPlugin) {
+            ((SaslTransportPlugin)transportPlugin).close();
+        }
     }
 
     /**
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index 54731ee..463b841 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -34,11 +34,15 @@ import org.slf4j.LoggerFactory;
 public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     public static final String DIGEST = "DIGEST-MD5";
     private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
+    private WorkerTokenAuthorizer workerTokenAuthorizer;
 
     protected TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
+        if (workerTokenAuthorizer == null) {
+            workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
+        }
         //create an authentication callback handler
         CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(impersonationAllowed,
-                                                                                    new WorkerTokenAuthorizer(conf, type),
+                                                                                    workerTokenAuthorizer,
                                                                                     new JassPasswordProvider(loginConf));
 
         //create a transport factory that will invoke our auth callback for digest
@@ -93,4 +97,9 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     public boolean areWorkerTokensSupported() {
         return true;
     }
+
+    @Override
+    public void close() {
+        workerTokenAuthorizer.close();
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 081037b..27ea878 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -50,9 +50,13 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
     private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
     private static final String DISABLE_LOGIN_CACHE = "disableLoginCache";
     private static Map<LoginCacheKey, Login> loginCache = new ConcurrentHashMap<>();
+    private WorkerTokenAuthorizer workerTokenAuthorizer;
 
     @Override
     public TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
+        if (workerTokenAuthorizer == null) {
+            workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
+        }
         //create an authentication callback handler
         CallbackHandler server_callback_handler = new ServerCallbackHandler(loginConf, impersonationAllowed);
 
@@ -91,7 +95,7 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
 
         //Also add in support for worker tokens
         factory.addServerDefinition(DIGEST, ClientAuthUtils.SERVICE, hostName, null,
-                                    new SimpleSaslServerCallbackHandler(impersonationAllowed, new WorkerTokenAuthorizer(conf, type)));
+                                    new SimpleSaslServerCallbackHandler(impersonationAllowed, workerTokenAuthorizer));
 
         //create a wrap transport factory so that we could apply user credential during connections
         TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
@@ -234,6 +238,11 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         return true;
     }
 
+    @Override
+    public void close() {
+        workerTokenAuthorizer.close();
+    }
+
     /**
      * A TransportFactory that wraps another one, but assumes a specified UGI before calling through.
      *
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index d40ad6f..1694caa 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.security.auth.sasl;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.Socket;
 import java.security.Principal;
@@ -45,7 +46,7 @@ import org.apache.storm.utils.ExtendedThreadPoolExecutor;
 /**
  * Base class for SASL authentication plugin.
  */
-public abstract class SaslTransportPlugin implements ITransportPlugin {
+public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable {
     protected ThriftConnectionType type;
     protected Map<String, Object> conf;
     protected Configuration loginConf;
@@ -82,9 +83,9 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         if (serverTransportFactory != null) {
             serverArgs.transportFactory(serverTransportFactory);
         }
-        BlockingQueue workQueue = new SynchronousQueue();
+        BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
         if (queueSize != null) {
-            workQueue = new ArrayBlockingQueue(queueSize);
+            workQueue = new ArrayBlockingQueue<>(queueSize);
         }
         ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                                                             60, TimeUnit.SECONDS, workQueue);
@@ -92,6 +93,10 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         return new TThreadPoolServer(serverArgs);
     }
 
+    @Override
+    public void close() {
+    }
+
     /**
      * Create the transport factory needed for serving.  All subclass must implement this method.
      *
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
index 6c7dbb6..f321221 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -12,6 +12,8 @@
 
 package org.apache.storm.security.auth.workertoken;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Base64;
 import java.util.Map;
 import java.util.Optional;
@@ -39,9 +41,10 @@ import org.slf4j.LoggerFactory;
 /**
  * Allow for SASL authentication using worker tokens.
  */
-public class WorkerTokenAuthorizer implements PasswordProvider {
+public class WorkerTokenAuthorizer implements PasswordProvider, Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
     private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache;
+    private final IStormClusterState state;
 
     /**
      * Constructor.
@@ -72,6 +75,7 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
                             });
         }
         keyCache = tmpKeyCache;
+        this.state = state;
     }
 
     private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
@@ -141,4 +145,11 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
         WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
         return deser.get_userName();
     }
+
+    @Override
+    public void close() {
+        if (state != null) {
+            state.disconnect();
+        }
+    }
 }
\ No newline at end of file
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index 477bf05..19f016f 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -46,14 +46,13 @@ import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.messaging.TransportFactory;
 import org.apache.storm.utils.Utils;
 import org.junit.Test;
-import org.mockito.internal.matchers.LessThan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class NettyTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyTest.class);
-
+    
     private final AtomicBoolean[] remoteBpStatus = new AtomicBoolean[]{new AtomicBoolean(), new AtomicBoolean()};
     private final int taskId = 1;
 
@@ -114,9 +113,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, 0);
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
                 IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
@@ -176,9 +174,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, 0);
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
                 IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
@@ -231,9 +228,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, 0);
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
                 IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
@@ -278,8 +274,7 @@ public class NettyTest {
                     CompletableFuture<?> serverStart = CompletableFuture.runAsync(() -> {
                         try {
                             Thread.sleep(100);
-                            server.set(context.bind(null, port));
-                            server.get().registerRecv(mkConnectionCallback(response::set));
+                            server.set(context.bind(null, port, mkConnectionCallback(response::set), null));
                             waitUntilReady(client, server.get());
                         } catch (Exception e) {
                             throw Utils.wrapInRuntime(e);
@@ -322,12 +317,11 @@ public class NettyTest {
         AtomicInteger received = new AtomicInteger();
         IContext context = TransportFactory.makeContext(stormConf);
         try {
-            try (IConnection server = context.bind(null, 0);
-                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback((message) -> {
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback((message) -> {
                     responses.add(message);
                     received.incrementAndGet();
-                }));
+                }), null);
+                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
                 waitUntilReady(client, server);
 
                 IntStream.range(1, numMessages)
@@ -375,8 +369,7 @@ public class NettyTest {
             try (IConnection client = context.connect(null, "localhost", port, remoteBpStatus)) {
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
                 send(client, taskId, messageBytes);
-                try (IConnection server = context.bind(null, port)) {
-                    server.registerRecv(mkConnectionCallback(response::set));
+                try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null)) {
                     waitUntilReady(client, server);
                     send(client, taskId, messageBytes);
                     waitForNotNull(response);
@@ -406,9 +399,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, port);
+            try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null);
                  IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 222435f..eb7a746 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -134,7 +134,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
 
     private final Nimbus nimbus;
     //This is very private and does not need to be exposed
-    private final AtomicInteger portCounter;
+    private int portCounter;
     private final Map<String, Object> daemonConf;
     private final List<Supervisor> supervisors;
     private final IStateStorage state;
@@ -225,7 +225,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             this.daemonConf = new HashMap<>(conf);
             this.metricRegistry = new StormMetricsRegistry();
 
-            this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
+            this.portCounter = builder.supervisorSlotPortMin;
             ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf);
             this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs);
             if (builder.clusterState == null) {
@@ -691,7 +691,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
 
         List<Integer> portNumbers = new ArrayList<>(ports.intValue());
         for (int i = 0; i < ports.intValue(); i++) {
-            portNumbers.add(portCounter.getAndIncrement());
+            portNumbers.add(portCounter++);
         }
 
         Map<String, Object> superConf = new HashMap<>(daemonConf);
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index 77d9d01..b6bfd47 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -415,6 +415,7 @@ public class LocalFsBlobStore extends BlobStore {
         if (timer != null) {
             timer.cancel();;
         }
+        stormClusterState.disconnect();
     }
 
     @Override
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 4607862..60d5e61 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -835,7 +835,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * Set a new assignment asynchronously.
      * @param newAssignment the new assignment for this slot to run, null to run nothing
      */
-    public void setNewAssignment(LocalAssignment newAssignment) {
+    public final void setNewAssignment(LocalAssignment newAssignment) {
         this.newAssignment.set(newAssignment == null ? null : new TimerDecoratedAssignment(newAssignment, staticState.slotMetrics.workerLaunchDuration));
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index f1bc79b..fc80b6b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -343,9 +343,7 @@ public class AsyncLocalizer implements AutoCloseable {
 
     @Override
     public void close() throws InterruptedException {
-        if (execService != null) {
-            execService.shutdown();
-        }
+        execService.shutdown();
     }
 
     private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
index 3f1fb04..2d00680 100644
--- a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
+++ b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
@@ -81,10 +81,11 @@ public class WorkerTokenTest {
             assertEquals(ONE_DAY_MILLIS, info.get_expirationTimeMillis());
             assertEquals(versionNumber, info.get_secretVersion());
 
-            //Verify the signature...
-            WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState);
-            byte[] signature = wta.getSignedPasswordFor(wt.get_info(), info);
-            assertArrayEquals(wt.get_signature(), signature);
+            try (WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState)) {
+                //Verify the signature...
+                byte[] signature = wta.getSignedPasswordFor(wt.get_info(), info);
+                assertArrayEquals(wt.get_signature(), signature);
+            }
         }
     }
 
@@ -135,13 +136,14 @@ public class WorkerTokenTest {
             //Expire the token
             Time.advanceTime(ONE_DAY_MILLIS + 1);
 
-            //Verify the signature...
-            WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState);
-            try {
-                wta.getSignedPasswordFor(wt.get_info(), info);
-                fail("Expected an expired token to not be signed!!!");
-            } catch (IllegalArgumentException ia) {
-                //What we want...
+            try (WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState)) {
+                try {
+                    //Verify the signature...
+                    wta.getSignedPasswordFor(wt.get_info(), info);
+                    fail("Expected an expired token to not be signed!!!");
+                } catch (IllegalArgumentException ia) {
+                    //What we want...
+                }
             }
 
             //Verify if WorkerTokenManager recognizes the expired WorkerToken.