You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/05/05 14:14:07 UTC

[storm] branch master updated (8951e7a -> 26e6c06)

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git.


    from 8951e7a  Merge branch 'STORM-3373' of https://github.com/srdo/storm into STORM-3373-merge
     new aaf1113  STORM-3376: Set Server callback before opening Netty socket, so we don't drop messages during startup
     new 2bf972e  Merge branch 'STORM-3376' of https://github.com/srdo/storm into asfgit-master
     new c985695  STORM-3379: Fix intermittent NPE during worker boot in local mode
     new 26e6c06  Merge branch 'STORM-3379' of https://github.com/srdo/storm into asfgit-master

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 integration-test/config/storm.yaml                 |   6 ++
 .../test/org/apache/storm/sql/TestStormSql.java    |  45 ++++-----
 .../backends/streams/TestPlanCompiler.java         |  96 +++++++++---------
 .../src/test/org/apache/storm/sql/TestUtils.java   |  23 ++---
 .../org/apache/storm/cluster/IStateStorage.java    |   4 +-
 .../storm/cluster/StormClusterStateImpl.java       |   8 +-
 .../jvm/org/apache/storm/daemon/worker/Worker.java |  30 ++++--
 .../apache/storm/daemon/worker/WorkerState.java    |  83 ++++++++--------
 .../org/apache/storm/messaging/IConnection.java    |  14 ---
 .../jvm/org/apache/storm/messaging/IContext.java   |   5 +-
 .../org/apache/storm/messaging/local/Context.java  |  98 +++++++++----------
 .../org/apache/storm/messaging/netty/Client.java   |  15 ---
 .../org/apache/storm/messaging/netty/Context.java  |   6 +-
 .../org/apache/storm/messaging/netty/Server.java   |  29 +++---
 .../apache/storm/security/auth/ThriftServer.java   |   8 +-
 .../auth/digest/DigestSaslTransportPlugin.java     |  11 ++-
 .../auth/kerberos/KerberosSaslTransportPlugin.java |  11 ++-
 .../security/auth/sasl/SaslTransportPlugin.java    |  11 ++-
 .../auth/workertoken/WorkerTokenAuthorizer.java    |  15 ++-
 .../org/apache/storm/utils/SupervisorClient.java   |   5 +-
 .../apache/storm/utils/SupervisorIfaceFactory.java |  13 ++-
 .../apache/storm/messaging/netty/NettyTest.java    |  28 ++----
 .../main/java/org/apache/storm/LocalCluster.java   |   6 +-
 .../apache/storm/blobstore/LocalFsBlobStore.java   |   1 +
 .../apache/storm/daemon/supervisor/Container.java  |   4 +-
 .../storm/daemon/supervisor/ContainerLauncher.java |   7 +-
 .../storm/daemon/supervisor/LocalContainer.java    |  10 +-
 .../daemon/supervisor/LocalContainerLauncher.java  |   7 +-
 .../storm/daemon/supervisor/ReadClusterState.java  |   3 +-
 .../org/apache/storm/daemon/supervisor/Slot.java   |   2 +-
 .../apache/storm/daemon/supervisor/Supervisor.java | 108 ++++++++++++---------
 .../org/apache/storm/localizer/AsyncLocalizer.java |   4 +-
 .../nimbus/AssignmentDistributionService.java      |   2 +-
 .../security/auth/workertoken/WorkerTokenTest.java |  24 ++---
 34 files changed, 389 insertions(+), 353 deletions(-)
 copy storm-server/src/main/java/org/apache/storm/daemon/drpc/RequestFactory.java => storm-client/src/jvm/org/apache/storm/utils/SupervisorIfaceFactory.java (78%)


[storm] 01/04: STORM-3376: Set Server callback before opening Netty socket, so we don't drop messages during startup

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit aaf1113360ce151d86948860c2f290befa79abb8
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Fri Apr 12 20:25:42 2019 +0200

    STORM-3376: Set Server callback before opening Netty socket, so we don't drop messages during startup
---
 integration-test/config/storm.yaml                 |  6 ++
 .../test/org/apache/storm/sql/TestStormSql.java    | 45 +++++-----
 .../backends/streams/TestPlanCompiler.java         | 96 ++++++++++-----------
 .../src/test/org/apache/storm/sql/TestUtils.java   | 23 ++---
 .../org/apache/storm/cluster/IStateStorage.java    |  4 +-
 .../storm/cluster/StormClusterStateImpl.java       |  8 +-
 .../jvm/org/apache/storm/daemon/worker/Worker.java |  4 +-
 .../apache/storm/daemon/worker/WorkerState.java    | 40 ++++-----
 .../org/apache/storm/messaging/IConnection.java    | 14 ----
 .../jvm/org/apache/storm/messaging/IContext.java   |  5 +-
 .../org/apache/storm/messaging/local/Context.java  | 98 ++++++++++------------
 .../org/apache/storm/messaging/netty/Client.java   | 15 ----
 .../org/apache/storm/messaging/netty/Context.java  |  6 +-
 .../org/apache/storm/messaging/netty/Server.java   | 29 +++----
 .../apache/storm/security/auth/ThriftServer.java   |  8 +-
 .../auth/digest/DigestSaslTransportPlugin.java     | 11 ++-
 .../auth/kerberos/KerberosSaslTransportPlugin.java | 11 ++-
 .../security/auth/sasl/SaslTransportPlugin.java    | 11 ++-
 .../auth/workertoken/WorkerTokenAuthorizer.java    | 13 ++-
 .../apache/storm/messaging/netty/NettyTest.java    | 28 +++----
 .../main/java/org/apache/storm/LocalCluster.java   |  6 +-
 .../apache/storm/blobstore/LocalFsBlobStore.java   |  1 +
 .../org/apache/storm/daemon/supervisor/Slot.java   |  2 +-
 .../org/apache/storm/localizer/AsyncLocalizer.java |  4 +-
 .../security/auth/workertoken/WorkerTokenTest.java | 24 +++---
 25 files changed, 247 insertions(+), 265 deletions(-)

diff --git a/integration-test/config/storm.yaml b/integration-test/config/storm.yaml
index 67784e7..774497f 100644
--- a/integration-test/config/storm.yaml
+++ b/integration-test/config/storm.yaml
@@ -35,3 +35,9 @@ drpc.servers:
   - "node1"
 
 supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 6709]
+
+# Enable assertions
+nimbus.childopts: "-Xmx1024m -ea"
+supervisor.childopts: "-Xmx256m -ea"
+worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -ea"
+ui.childopts: "-Xmx768m -ea"
\ No newline at end of file
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 5a30764..3958894 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -29,28 +29,24 @@ import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(TestUtils.MockBoltExtension.class)
+@ExtendWith(TestUtils.MockInsertBoltExtension.class)
 public class TestStormSql {
 
     public static final int WAIT_TIMEOUT_MS = 1000 * 1000;
     public static final int WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED = 1000 * 10;
     public static final int WAIT_TIMEOUT_MS_ERROR_EXPECTED = 1000;
 
-    @Rule
-    public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
-
-    @Rule
-    public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
-
     private static LocalCluster cluster;
 
-    @BeforeClass
+    @BeforeAll
     public static void staticSetup() throws Exception {
         DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
         DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
@@ -61,7 +57,7 @@ public class TestStormSql {
         cluster = new LocalCluster();
     }
 
-    @AfterClass
+    @AfterAll
     public static void staticCleanup() {
         DataSourcesRegistry.providerMap().remove("mock");
         DataSourcesRegistry.providerMap().remove("mocknested");
@@ -176,7 +172,7 @@ public class TestStormSql {
         Assert.assertEquals(0, values.size());
     }
 
-    @Test(expected = ValidationException.class)
+    @Test
     public void testExternalUdfType() throws Exception {
         List<String> stmt = new ArrayList<>();
         stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
@@ -185,12 +181,11 @@ public class TestStormSql {
         stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
-        Assert.fail("Should raise ValidationException.");
+        Assertions.assertThrows(ValidationException.class,
+            () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
     }
 
-    @Test(expected = CompilingClassLoader.CompilerException.class)
+    @Test
     public void testExternalUdfType2() throws Exception {
         List<String> stmt = new ArrayList<>();
         // generated code will be not compilable since return type of MYPLUS and type of 'x' are different
@@ -200,9 +195,8 @@ public class TestStormSql {
         stmt.add("INSERT INTO BAR SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
-        Assert.fail("Should raise CompilerException.");
+        Assertions.assertThrows(CompilingClassLoader.CompilerException.class,
+            () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
     }
 
     @Test
@@ -222,7 +216,7 @@ public class TestStormSql {
         Assert.assertEquals(5, values.get(1).getFirst());
     }
 
-    @Test(expected = UnsupportedOperationException.class)
+    @Test
     public void testExternalUdfUsingJar() throws Exception {
         List<String> stmt = new ArrayList<>();
         stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
@@ -231,9 +225,8 @@ public class TestStormSql {
         stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
         StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
 
-        impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
-        Assert.fail("Should raise UnsupportedOperationException.");
+        Assertions.assertThrows(UnsupportedOperationException.class,
+            () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
     }
 
     private static class MockDataSourceProvider implements DataSourcesProvider {
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
index 68203d8..21bc6f3 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
@@ -33,35 +33,31 @@ import org.apache.storm.sql.planner.streams.QueryPlanner;
 import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
 import org.apache.storm.streams.Pair;
 import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
 
 import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(TestUtils.MockBoltExtension.class)
+@ExtendWith(TestUtils.MockInsertBoltExtension.class)
 public class TestPlanCompiler {
-    private static LocalCluster cluster;
 
-    @Rule
-    public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
-
-    @Rule
-    public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
+    private static LocalCluster cluster;
 
-    @BeforeClass
+    @BeforeAll
     public static void staticSetup() throws Exception {
         cluster = new LocalCluster();
     }
 
-    @AfterClass
+    @AfterAll
     public static void staticCleanup() {
-        if (cluster!= null) {
+        if (cluster != null) {
             cluster.shutdown();
             cluster = null;
         }
@@ -82,7 +78,7 @@ public class TestPlanCompiler {
         final StormTopology topo = proc.build();
 
         SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
-        Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
+        Assert.assertArrayEquals(new Values[]{new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
     }
 
     @Test
@@ -99,15 +95,15 @@ public class TestPlanCompiler {
         final StormTopology topo = proc.build();
 
         SqlTestUtil.runStormTopology(cluster, TestUtils.MockInsertBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
-        Assert.assertArrayEquals(new Pair[] { Pair.of(4, new Values(4, "abcde", "y")) }, TestUtils.MockInsertBolt.getCollectedValues().toArray());
+        Assert.assertArrayEquals(new Pair[]{Pair.of(4, new Values(4, "abcde", "y"))}, TestUtils.MockInsertBolt.getCollectedValues().toArray());
     }
 
     @Test
     public void testUdf() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT MYPLUS(ID, 3)" +
-                "FROM FOO " +
-                "WHERE ID = 2";
+        String sql = "SELECT MYPLUS(ID, 3)"
+            + "FROM FOO "
+            + "WHERE ID = 2";
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
         Map<String, ISqlStreamsDataSource> data = new HashMap<>();
         data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
@@ -119,15 +115,15 @@ public class TestPlanCompiler {
         final StormTopology topo = proc.build();
 
         SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
-        Assert.assertArrayEquals(new Values[] { new Values(5) }, TestUtils.MockBolt.getCollectedValues().toArray());
+        Assert.assertArrayEquals(new Values[]{new Values(5)}, TestUtils.MockBolt.getCollectedValues().toArray());
     }
 
     @Test
     public void testNested() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
-                "FROM FOO " +
-                "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+        String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD "
+            + "FROM FOO "
+            + "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
 
         final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -144,23 +140,23 @@ public class TestPlanCompiler {
         Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
         Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
         Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))},
-                TestUtils.MockBolt.getCollectedValues().toArray());
+            TestUtils.MockBolt.getCollectedValues().toArray());
     }
 
     /**
-     * All the binary literal tests are done here, because Avatica converts the result to byte[]
-     * whereas Stream provides the result to ByteString which makes different semantic from Stream implementation.
+     * All the binary literal tests are done here, because Avatica converts the result to byte[] whereas Stream provides the result to
+     * ByteString which makes different semantic from Stream implementation.
      */
     @Test
     public void testBinaryStringFunctions() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT x'45F0AB' || x'45F0AB', " +
-                "POSITION(x'F0' IN x'453423F0ABBC'), " +
-                "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), " +
-                "SUBSTRING(x'453423F0ABBC' FROM 3), " +
-                "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) " +
-                "FROM FOO " +
-                "WHERE ID > 0 AND ID < 2";
+        String sql = "SELECT x'45F0AB' || x'45F0AB', "
+            + "POSITION(x'F0' IN x'453423F0ABBC'), "
+            + "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), "
+            + "SUBSTRING(x'453423F0ABBC' FROM 3), "
+            + "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) "
+            + "FROM FOO "
+            + "WHERE ID > 0 AND ID < 2";
 
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
         final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -190,19 +186,19 @@ public class TestPlanCompiler {
     @Test
     public void testDateKeywordsAndFunctions() throws Exception {
         int EXPECTED_VALUE_SIZE = 1;
-        String sql = "SELECT " +
-                "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, " +
-                "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, " +
-                "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')," +
-                "FLOOR(DATE '2016-01-23' TO MONTH)," +
-                "CEIL(TIME '12:34:56' TO MINUTE)," +
-                "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP," +
-                "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}," +
-                "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}," +
-                "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, " +
-                "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field "   +
-                "FROM FOO " +
-                "WHERE ID > 0 AND ID < 2";
+        String sql = "SELECT "
+            + "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, "
+            + "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, "
+            + "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56'),"
+            + "FLOOR(DATE '2016-01-23' TO MONTH),"
+            + "CEIL(TIME '12:34:56' TO MINUTE),"
+            + "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP,"
+            + "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')},"
+            + "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')},"
+            + "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, "
+            + "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field "
+            + "FROM FOO "
+            + "WHERE ID > 0 AND ID < 2";
         TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
 
         final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -229,7 +225,7 @@ public class TestPlanCompiler {
         int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY);
 
         Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt,
-                        134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
-                TestUtils.MockBolt.getCollectedValues().toArray());
+            134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
+            TestUtils.MockBolt.getCollectedValues().toArray());
     }
-}
\ No newline at end of file
+}
diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index f728bf6..a1aa552 100644
--- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -39,22 +39,23 @@ import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.junit.rules.ExternalResource;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
 
 public class TestUtils {
-  public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() {
-    @Override
-    protected void before() throws Throwable {
-      MockInsertBolt.getCollectedValues().clear();
+    public static final class MockInsertBoltExtension implements BeforeEachCallback {
+        @Override
+        public void beforeEach(ExtensionContext ctx) throws Exception {
+            MockInsertBolt.getCollectedValues().clear();
+        }
     }
-  };
 
-  public static final ExternalResource mockBoltValueResource = new ExternalResource() {
-    @Override
-    protected void before() throws Throwable {
-      MockBolt.getCollectedValues().clear();
+    public static final class MockBoltExtension implements BeforeEachCallback {
+        @Override
+        public void beforeEach(ExtensionContext arg0) throws Exception {
+            MockBolt.getCollectedValues().clear();
+        }
     }
-  };
 
   public static class MyPlus {
     public static Integer evaluate(Integer x, Integer y) {
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
index 0889a26..b673932 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.cluster;
 
+import java.io.Closeable;
 import java.util.List;
 import org.apache.storm.callback.ZKStateChangedCallback;
 import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
@@ -27,7 +28,7 @@ import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
  * For example, performing these two calls: set_data("/path", data, acls); void set_worker_hb("/path", heartbeat, acls); may or may not
  * cause a collision in "/path". Never use the same paths with the *_hb* methods as you do with the others.
  */
-public interface IStateStorage {
+public interface IStateStorage extends Closeable {
 
     /**
      * Registers a callback function that gets called when CuratorEvents happen.
@@ -115,6 +116,7 @@ public interface IStateStorage {
     /**
      * Close the connection to the data store.
      */
+    @Override
     void close();
 
     /**
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 644f465..f330278 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -56,7 +56,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
     private final List<ACL> defaultAcls;
     private final String stateId;
-    private final boolean solo;
+    private final boolean shouldCloseStateStorageOnDisconnect;
     private final ClusterStateContext context;
     private IStateStorage stateStorage;
     private ILocalAssignmentsBackend assignmentsBackend;
@@ -74,10 +74,10 @@ public class StormClusterStateImpl implements IStormClusterState {
     private ConcurrentHashMap<String, Runnable> logConfigCallback;
 
     public StormClusterStateImpl(IStateStorage StateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend,
-                                 ClusterStateContext context, boolean solo) throws Exception {
+                                 ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception {
 
         this.stateStorage = StateStorage;
-        this.solo = solo;
+        this.shouldCloseStateStorageOnDisconnect = shouldCloseStateStorageOnDisconnect;
         this.defaultAcls = context.getDefaultZkAcls();
         this.context = context;
         this.assignmentsBackend = assignmentsassignmentsBackend;
@@ -831,7 +831,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     @Override
     public void disconnect() {
         stateStorage.unregister(stateId);
-        if (solo) {
+        if (shouldCloseStateStorageOnDisconnect) {
             stateStorage.close();
             this.assignmentsBackend.close();
         }
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 05a79c6..e950c051 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -197,9 +197,7 @@ public class Worker implements Shutdownable, DaemonCommon {
             .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
                                Worker.this::doExecutorHeartbeats);
 
-        workerState.registerCallbacks();
-
-        workerState.refreshConnections(null);
+        workerState.refreshConnections();
 
         workerState.activateWorkerWhenAllConnectionsReady();
 
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 0c13bec..2aa96a9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.StormTimer;
@@ -57,6 +58,7 @@ import org.apache.storm.hooks.IWorkerHook;
 import org.apache.storm.messaging.ConnectionWithStatus;
 import org.apache.storm.messaging.DeserializingConnectionCallback;
 import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IConnectionCallback;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.messaging.TransportFactory;
 import org.apache.storm.messaging.netty.BackPressureStatus;
@@ -158,7 +160,6 @@ public class WorkerState {
         this.conf = conf;
         this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
         this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
-        this.receiver = this.mqContext.bind(topologyId, port);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
         this.supervisorPort = supervisorPort;
@@ -215,6 +216,16 @@ public class WorkerState {
         this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);
         this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue);
         this.deserializedWorkerHooks = deserializeWorkerHooks();
+        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
+        IConnectionCallback cb = new DeserializingConnectionCallback(topologyConf,
+            getWorkerTopologyContext(),
+            this::transferLocalBatch);
+        Supplier<Object> newConnectionResponse = () -> {
+            BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+            LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
+            return bpStatus;
+        };
+        this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse);
     }
 
     private static double getQueueLoad(JCQueue q) {
@@ -356,19 +367,11 @@ public class WorkerState {
         return userTimer;
     }
 
-    public void refreshConnections() {
-        try {
-            refreshConnections(() -> refreshConnectionsTimer.schedule(0, this::refreshConnections));
-        } catch (Exception e) {
-            throw Utils.wrapInRuntime(e);
-        }
-    }
-
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
 
-    public void refreshConnections(Runnable callback) throws Exception {
+    public void refreshConnections() {
         Assignment assignment = getLocalAssignment(conf, stormClusterState, topologyId);
 
         Set<NodeInfo> neededConnections = new HashSet<>();
@@ -497,21 +500,6 @@ public class WorkerState {
         );
     }
 
-    public void registerCallbacks() {
-        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
-        receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
-                                                                  getWorkerTopologyContext(),
-                                                                  this::transferLocalBatch));
-        // Send curr BackPressure status to new clients
-        receiver.registerNewConnectionResponse(
-            () -> {
-                BackPressureStatus bpStatus = bpTracker.getCurrStatus();
-                LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
-                return bpStatus;
-            }
-        );
-    }
-
     /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */
     public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
         return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
@@ -576,7 +564,7 @@ public class WorkerState {
         }
     }
 
-    public WorkerTopologyContext getWorkerTopologyContext() {
+    public final WorkerTopologyContext getWorkerTopologyContext() {
         try {
             String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
             String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index c2e156c..f713c7f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -22,20 +22,6 @@ import org.apache.storm.messaging.netty.BackPressureStatus;
 public interface IConnection extends AutoCloseable {
 
     /**
-     * Register a callback to be notified when data is ready to be processed.
-     *
-     * @param cb the callback to process the messages.
-     */
-    void registerRecv(IConnectionCallback cb);
-
-    /**
-     * Register a response generator to be used to send an initial response when a new client connects.
-     *
-     * @param cb the callback to process the connection.
-     */
-    void registerNewConnectionResponse(Supplier<Object> cb);
-
-    /**
      * Send load metrics to all downstream connections.
      *
      * @param taskToLoad a map from the task id to the load for that task.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index 8d2c0dc..057ae30 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -14,6 +14,7 @@ package org.apache.storm.messaging;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 /**
  * This interface needs to be implemented for messaging plugin.
@@ -41,9 +42,11 @@ public interface IContext {
      *
      * @param storm_id topology ID
      * @param port     port #
+     * @param cb The callback to deliver received messages to
+     * @param newConnectionResponse Supplier of the initial message to send to new client connections
      * @return server side connection
      */
-    IConnection bind(String storm_id, int port);
+    IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse);
 
     /**
      * This method establish a client side connection to a remote server
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index 6071cbe..2737dfb 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -15,6 +15,7 @@ package org.apache.storm.messaging.local;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -37,36 +38,36 @@ import org.slf4j.LoggerFactory;
 
 public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
-    private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
-
-    private static LocalServer getLocalServer(String nodeId, int port) {
-        String key = nodeId + "-" + port;
-        LocalServer ret = _registry.get(key);
-        if (ret == null) {
-            ret = new LocalServer(port);
-            LocalServer tmp = _registry.putIfAbsent(key, ret);
-            if (tmp != null) {
-                ret = tmp;
-            }
+    private final ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
+
+    private static String getNodeKey(String nodeId, int port) {
+        return nodeId + "-" + port;
+    }
+    
+    private LocalServer createLocalServer(String nodeId, int port, IConnectionCallback cb) {
+        String key = getNodeKey(nodeId, port);
+        LocalServer ret = new LocalServer(port, cb);
+        LocalServer existing = _registry.put(key, ret);
+        if (existing != null) {
+            //Can happen if worker is restarted in the same topology, e.g. due to blob update
+            LOG.info("Replacing existing server for key {}", existing, ret, key);
         }
         return ret;
     }
 
-    ;
-
     @Override
     public void prepare(Map<String, Object> topoConf) {
         //NOOP
     }
 
     @Override
-    public IConnection bind(String storm_id, int port) {
-        return getLocalServer(storm_id, port);
+    public IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
+        return createLocalServer(storm_id, port, cb);
     }
 
     @Override
     public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
-        return new LocalClient(getLocalServer(storm_id, port));
+        return new LocalClient(storm_id, port);
     }
 
     @Override
@@ -74,25 +75,16 @@ public class Context implements IContext {
         //NOOP
     }
 
-    private static class LocalServer implements IConnection {
+    private class LocalServer implements IConnection {
         final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
         final int port;
-        volatile IConnectionCallback _cb;
+        final IConnectionCallback _cb;
 
-        public LocalServer(int port) {
+        public LocalServer(int port, IConnectionCallback cb) {
             this.port = port;
+            this._cb = cb;
         }
-
-        @Override
-        public void registerRecv(IConnectionCallback cb) {
-            _cb = cb;
-        }
-
-        @Override
-        public void registerNewConnectionResponse(Supplier<Object> cb) {
-            return;
-        }
-
+        
         @Override
         public void send(Iterator<TaskMessage> msgs) {
             throw new IllegalArgumentException("SHOULD NOT HAPPEN");
@@ -131,14 +123,16 @@ public class Context implements IContext {
         }
     }
 
-    private static class LocalClient implements IConnection {
-        private final LocalServer _server;
+    private class LocalClient implements IConnection {
         //Messages sent before the server registered a callback
         private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
         private final ScheduledExecutorService _pendingFlusher;
+        private final int port;
+        private final String registryKey;
 
-        public LocalClient(LocalServer server) {
-            _server = server;
+        public LocalClient(String stormId, int port) {
+            this.port = port;
+            this.registryKey = getNodeKey(stormId, port);
             _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
             _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                 @Override
@@ -163,35 +157,26 @@ public class Context implements IContext {
             }, 5, 5, TimeUnit.SECONDS);
         }
 
-        @Override
-        public void registerRecv(IConnectionCallback cb) {
-            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
-        }
-
-        @Override
-        public void registerNewConnectionResponse(Supplier<Object> cb) {
-            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
-        }
-
         private void flushPending() {
-            IConnectionCallback serverCb = _server._cb;
-            if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
+            //Can't cache server in client, server can change when workers restart.
+            LocalServer server = _registry.get(registryKey);
+            if (server != null && !_pendingDueToUnregisteredServer.isEmpty()) {
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 _pendingDueToUnregisteredServer.drainTo(ret);
-                serverCb.recv(ret);
+                server._cb.recv(ret);
             }
         }
 
         @Override
         public void send(Iterator<TaskMessage> msgs) {
-            IConnectionCallback serverCb = _server._cb;
-            if (serverCb != null) {
+            LocalServer server = _registry.get(registryKey);
+            if (server != null) {
                 flushPending();
                 ArrayList<TaskMessage> ret = new ArrayList<>();
                 while (msgs.hasNext()) {
                     ret.add(msgs.next());
                 }
-                serverCb.recv(ret);
+                server._cb.recv(ret);
             } else {
                 while (msgs.hasNext()) {
                     _pendingDueToUnregisteredServer.add(msgs.next());
@@ -201,12 +186,19 @@ public class Context implements IContext {
 
         @Override
         public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
-            return _server.getLoad(tasks);
+            LocalServer server = _registry.get(registryKey);
+            if (server != null) {
+                return server.getLoad(tasks);
+            }
+            return Collections.emptyMap();
         }
 
         @Override
         public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
-            _server.sendLoadMetrics(taskToLoad);
+            LocalServer server = _registry.get(registryKey);
+            if (server != null) {
+                server.sendLoadMetrics(taskToLoad);
+            }
         }
 
         @Override
@@ -216,7 +208,7 @@ public class Context implements IContext {
 
         @Override
         public int getPort() {
-            return _server.getPort();
+            return port;
         }
 
         @Override
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index fe2fe16..61a9c99 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -236,21 +236,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
         }
     }
 
-    /**
-     * Receiving messages is not supported by a client.
-     *
-     * @throws java.lang.UnsupportedOperationException whenever this method is being called.
-     */
-    @Override
-    public void registerRecv(IConnectionCallback cb) {
-        throw new UnsupportedOperationException("Client connection should not receive any messages");
-    }
-
-    @Override
-    public void registerNewConnectionResponse(Supplier<Object> cb) {
-        throw new UnsupportedOperationException("Client does not accept new connections");
-    }
-
     @Override
     public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
         throw new RuntimeException("Client connection should not send load metrics");
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 27ccd04..ca46c4f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -17,8 +17,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import org.apache.storm.Config;
 import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IConnectionCallback;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
 import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
@@ -53,8 +55,8 @@ public class Context implements IContext {
      * establish a server with a binding port
      */
     @Override
-    public synchronized IConnection bind(String storm_id, int port) {
-        Server server = new Server(topoConf, port);
+    public synchronized IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
+        Server server = new Server(topoConf, port, cb, newConnectionResponse);
         serverConnections.add(server);
         return server;
     }
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 7d150c3..a3cd8b0 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -62,14 +62,23 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     private final int port;
     private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
     private final KryoValuesSerializer ser;
+    private final IConnectionCallback cb;
+    private final Supplier<Object> newConnectionResponse;
     private volatile boolean closing = false;
-    private IConnectionCallback cb = null;
-    private Supplier<Object> newConnectionResponse;
 
-    Server(Map<String, Object> topoConf, int port) {
+    /**
+     * Starts Netty at the given port
+     * @param topoConf The topology config
+     * @param port The port to start Netty at
+     * @param cb The callback to deliver incoming messages to
+     * @param newConnectionResponse The response to send to clients when they connect. Can be null.
+     */
+    Server(Map<String, Object> topoConf, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
         this.topoConf = topoConf;
         this.port = port;
         ser = new KryoValuesSerializer(topoConf);
+        this.cb = cb;
+        this.newConnectionResponse = newConnectionResponse;
 
         // Configure the server.
         int buffer_size = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -136,19 +145,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
             return;
         }
         addReceiveCount(from, msgs.size());
-        if (cb != null) {
-            cb.recv(msgs);
-        }
-    }
-
-    @Override
-    public void registerRecv(IConnectionCallback cb) {
-        this.cb = cb;
-    }
-
-    @Override
-    public void registerNewConnectionResponse(Supplier<Object> newConnectionResponse) {
-        this.newConnectionResponse = newConnectionResponse;
+        cb.recv(msgs);
     }
 
     @Override
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index eaeb17a..e83d37d 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -12,9 +12,11 @@
 
 package org.apache.storm.security.auth;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import javax.security.auth.login.Configuration;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
 import org.apache.storm.thrift.TProcessor;
 import org.apache.storm.thrift.server.TServer;
 import org.apache.storm.thrift.transport.TTransportException;
@@ -30,6 +32,7 @@ public class ThriftServer {
     private Configuration loginConf;
     private int port;
     private boolean areWorkerTokensSupported;
+    private ITransportPlugin transportPlugin;
 
     public ThriftServer(Map<String, Object> conf, TProcessor processor, ThriftConnectionType type) {
         this.conf = conf;
@@ -44,7 +47,7 @@ public class ThriftServer {
         }
         try {
             //locate our thrift transport plugin
-            ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
+            transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
             //server
             server = transportPlugin.getServer(this.processor);
             port = transportPlugin.getPort();
@@ -57,6 +60,9 @@ public class ThriftServer {
 
     public void stop() {
         server.stop();
+        if (transportPlugin instanceof SaslTransportPlugin) {
+            ((SaslTransportPlugin)transportPlugin).close();
+        }
     }
 
     /**
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index 54731ee..463b841 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -34,11 +34,15 @@ import org.slf4j.LoggerFactory;
 public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     public static final String DIGEST = "DIGEST-MD5";
     private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
+    private WorkerTokenAuthorizer workerTokenAuthorizer;
 
     protected TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
+        if (workerTokenAuthorizer == null) {
+            workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
+        }
         //create an authentication callback handler
         CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(impersonationAllowed,
-                                                                                    new WorkerTokenAuthorizer(conf, type),
+                                                                                    workerTokenAuthorizer,
                                                                                     new JassPasswordProvider(loginConf));
 
         //create a transport factory that will invoke our auth callback for digest
@@ -93,4 +97,9 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
     public boolean areWorkerTokensSupported() {
         return true;
     }
+
+    @Override
+    public void close() {
+        workerTokenAuthorizer.close();
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 081037b..27ea878 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -50,9 +50,13 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
     private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
     private static final String DISABLE_LOGIN_CACHE = "disableLoginCache";
     private static Map<LoginCacheKey, Login> loginCache = new ConcurrentHashMap<>();
+    private WorkerTokenAuthorizer workerTokenAuthorizer;
 
     @Override
     public TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
+        if (workerTokenAuthorizer == null) {
+            workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
+        }
         //create an authentication callback handler
         CallbackHandler server_callback_handler = new ServerCallbackHandler(loginConf, impersonationAllowed);
 
@@ -91,7 +95,7 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
 
         //Also add in support for worker tokens
         factory.addServerDefinition(DIGEST, ClientAuthUtils.SERVICE, hostName, null,
-                                    new SimpleSaslServerCallbackHandler(impersonationAllowed, new WorkerTokenAuthorizer(conf, type)));
+                                    new SimpleSaslServerCallbackHandler(impersonationAllowed, workerTokenAuthorizer));
 
         //create a wrap transport factory so that we could apply user credential during connections
         TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
@@ -234,6 +238,11 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
         return true;
     }
 
+    @Override
+    public void close() {
+        workerTokenAuthorizer.close();
+    }
+
     /**
      * A TransportFactory that wraps another one, but assumes a specified UGI before calling through.
      *
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index d40ad6f..1694caa 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.security.auth.sasl;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.Socket;
 import java.security.Principal;
@@ -45,7 +46,7 @@ import org.apache.storm.utils.ExtendedThreadPoolExecutor;
 /**
  * Base class for SASL authentication plugin.
  */
-public abstract class SaslTransportPlugin implements ITransportPlugin {
+public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable {
     protected ThriftConnectionType type;
     protected Map<String, Object> conf;
     protected Configuration loginConf;
@@ -82,9 +83,9 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         if (serverTransportFactory != null) {
             serverArgs.transportFactory(serverTransportFactory);
         }
-        BlockingQueue workQueue = new SynchronousQueue();
+        BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
         if (queueSize != null) {
-            workQueue = new ArrayBlockingQueue(queueSize);
+            workQueue = new ArrayBlockingQueue<>(queueSize);
         }
         ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                                                             60, TimeUnit.SECONDS, workQueue);
@@ -92,6 +93,10 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
         return new TThreadPoolServer(serverArgs);
     }
 
+    @Override
+    public void close() {
+    }
+
     /**
      * Create the transport factory needed for serving.  All subclass must implement this method.
      *
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
index 6c7dbb6..f321221 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -12,6 +12,8 @@
 
 package org.apache.storm.security.auth.workertoken;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Base64;
 import java.util.Map;
 import java.util.Optional;
@@ -39,9 +41,10 @@ import org.slf4j.LoggerFactory;
 /**
  * Allow for SASL authentication using worker tokens.
  */
-public class WorkerTokenAuthorizer implements PasswordProvider {
+public class WorkerTokenAuthorizer implements PasswordProvider, Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
     private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache;
+    private final IStormClusterState state;
 
     /**
      * Constructor.
@@ -72,6 +75,7 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
                             });
         }
         keyCache = tmpKeyCache;
+        this.state = state;
     }
 
     private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
@@ -141,4 +145,11 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
         WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
         return deser.get_userName();
     }
+
+    @Override
+    public void close() {
+        if (state != null) {
+            state.disconnect();
+        }
+    }
 }
\ No newline at end of file
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index 477bf05..19f016f 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -46,14 +46,13 @@ import org.apache.storm.messaging.TaskMessage;
 import org.apache.storm.messaging.TransportFactory;
 import org.apache.storm.utils.Utils;
 import org.junit.Test;
-import org.mockito.internal.matchers.LessThan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class NettyTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyTest.class);
-
+    
     private final AtomicBoolean[] remoteBpStatus = new AtomicBoolean[]{new AtomicBoolean(), new AtomicBoolean()};
     private final int taskId = 1;
 
@@ -114,9 +113,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, 0);
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
                 IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
@@ -176,9 +174,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, 0);
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
                 IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
@@ -231,9 +228,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, 0);
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
                 IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
@@ -278,8 +274,7 @@ public class NettyTest {
                     CompletableFuture<?> serverStart = CompletableFuture.runAsync(() -> {
                         try {
                             Thread.sleep(100);
-                            server.set(context.bind(null, port));
-                            server.get().registerRecv(mkConnectionCallback(response::set));
+                            server.set(context.bind(null, port, mkConnectionCallback(response::set), null));
                             waitUntilReady(client, server.get());
                         } catch (Exception e) {
                             throw Utils.wrapInRuntime(e);
@@ -322,12 +317,11 @@ public class NettyTest {
         AtomicInteger received = new AtomicInteger();
         IContext context = TransportFactory.makeContext(stormConf);
         try {
-            try (IConnection server = context.bind(null, 0);
-                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback((message) -> {
+            try (IConnection server = context.bind(null, 0, mkConnectionCallback((message) -> {
                     responses.add(message);
                     received.incrementAndGet();
-                }));
+                }), null);
+                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
                 waitUntilReady(client, server);
 
                 IntStream.range(1, numMessages)
@@ -375,8 +369,7 @@ public class NettyTest {
             try (IConnection client = context.connect(null, "localhost", port, remoteBpStatus)) {
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
                 send(client, taskId, messageBytes);
-                try (IConnection server = context.bind(null, port)) {
-                    server.registerRecv(mkConnectionCallback(response::set));
+                try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null)) {
                     waitUntilReady(client, server);
                     send(client, taskId, messageBytes);
                     waitForNotNull(response);
@@ -406,9 +399,8 @@ public class NettyTest {
         IContext context = TransportFactory.makeContext(stormConf);
         try {
             AtomicReference<TaskMessage> response = new AtomicReference<>();
-            try (IConnection server = context.bind(null, port);
+            try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null);
                  IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
-                server.registerRecv(mkConnectionCallback(response::set));
                 waitUntilReady(client, server);
                 byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
 
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 222435f..eb7a746 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -134,7 +134,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
 
     private final Nimbus nimbus;
     //This is very private and does not need to be exposed
-    private final AtomicInteger portCounter;
+    private int portCounter;
     private final Map<String, Object> daemonConf;
     private final List<Supervisor> supervisors;
     private final IStateStorage state;
@@ -225,7 +225,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
             this.daemonConf = new HashMap<>(conf);
             this.metricRegistry = new StormMetricsRegistry();
 
-            this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
+            this.portCounter = builder.supervisorSlotPortMin;
             ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf);
             this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs);
             if (builder.clusterState == null) {
@@ -691,7 +691,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
 
         List<Integer> portNumbers = new ArrayList<>(ports.intValue());
         for (int i = 0; i < ports.intValue(); i++) {
-            portNumbers.add(portCounter.getAndIncrement());
+            portNumbers.add(portCounter++);
         }
 
         Map<String, Object> superConf = new HashMap<>(daemonConf);
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index 77d9d01..b6bfd47 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -415,6 +415,7 @@ public class LocalFsBlobStore extends BlobStore {
         if (timer != null) {
             timer.cancel();;
         }
+        stormClusterState.disconnect();
     }
 
     @Override
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 4607862..60d5e61 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -835,7 +835,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
      * Set a new assignment asynchronously.
      * @param newAssignment the new assignment for this slot to run, null to run nothing
      */
-    public void setNewAssignment(LocalAssignment newAssignment) {
+    public final void setNewAssignment(LocalAssignment newAssignment) {
         this.newAssignment.set(newAssignment == null ? null : new TimerDecoratedAssignment(newAssignment, staticState.slotMetrics.workerLaunchDuration));
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index f1bc79b..fc80b6b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -343,9 +343,7 @@ public class AsyncLocalizer implements AutoCloseable {
 
     @Override
     public void close() throws InterruptedException {
-        if (execService != null) {
-            execService.shutdown();
-        }
+        execService.shutdown();
     }
 
     private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
index 3f1fb04..2d00680 100644
--- a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
+++ b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
@@ -81,10 +81,11 @@ public class WorkerTokenTest {
             assertEquals(ONE_DAY_MILLIS, info.get_expirationTimeMillis());
             assertEquals(versionNumber, info.get_secretVersion());
 
-            //Verify the signature...
-            WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState);
-            byte[] signature = wta.getSignedPasswordFor(wt.get_info(), info);
-            assertArrayEquals(wt.get_signature(), signature);
+            try (WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState)) {
+                //Verify the signature...
+                byte[] signature = wta.getSignedPasswordFor(wt.get_info(), info);
+                assertArrayEquals(wt.get_signature(), signature);
+            }
         }
     }
 
@@ -135,13 +136,14 @@ public class WorkerTokenTest {
             //Expire the token
             Time.advanceTime(ONE_DAY_MILLIS + 1);
 
-            //Verify the signature...
-            WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState);
-            try {
-                wta.getSignedPasswordFor(wt.get_info(), info);
-                fail("Expected an expired token to not be signed!!!");
-            } catch (IllegalArgumentException ia) {
-                //What we want...
+            try (WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState)) {
+                try {
+                    //Verify the signature...
+                    wta.getSignedPasswordFor(wt.get_info(), info);
+                    fail("Expected an expired token to not be signed!!!");
+                } catch (IllegalArgumentException ia) {
+                    //What we want...
+                }
             }
 
             //Verify if WorkerTokenManager recognizes the expired WorkerToken.


[storm] 02/04: STORM-3379: Fix intermittent NPE during worker boot in local mode

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit c985695e0728eb3af171ca6d346c51cc4e0b083a
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Sat Apr 13 20:33:29 2019 +0200

    STORM-3379: Fix intermittent NPE during worker boot in local mode
---
 .../jvm/org/apache/storm/daemon/worker/Worker.java |  26 +++--
 .../apache/storm/daemon/worker/WorkerState.java    |  45 +++++----
 .../auth/workertoken/WorkerTokenAuthorizer.java    |   2 +-
 .../org/apache/storm/utils/SupervisorClient.java   |   5 +-
 .../apache/storm/utils/SupervisorIfaceFactory.java |  30 ++++++
 .../apache/storm/daemon/supervisor/Container.java  |   4 +-
 .../storm/daemon/supervisor/ContainerLauncher.java |   7 +-
 .../storm/daemon/supervisor/LocalContainer.java    |  10 +-
 .../daemon/supervisor/LocalContainerLauncher.java  |   7 +-
 .../storm/daemon/supervisor/ReadClusterState.java  |   3 +-
 .../apache/storm/daemon/supervisor/Supervisor.java | 108 ++++++++++++---------
 .../nimbus/AssignmentDistributionService.java      |   2 +-
 12 files changed, 164 insertions(+), 85 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index e950c051..175a91a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -14,6 +14,7 @@ package org.apache.storm.daemon.worker;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -22,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -45,6 +47,7 @@ import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.Supervisor;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.IContext;
@@ -61,6 +64,7 @@ import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.SupervisorIfaceFactory;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -87,6 +91,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private AtomicReference<Credentials> credentialsAtom;
     private Subject subject;
     private Collection<IAutoCredentials> autoCreds;
+    private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
 
 
     /**
@@ -103,7 +108,7 @@ public class Worker implements Shutdownable, DaemonCommon {
      */
 
     public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId,
-                  int supervisorPort, int port, String workerId) {
+                  int supervisorPort, int port, String workerId, Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier) {
         this.conf = conf;
         this.context = context;
         this.topologyId = topologyId;
@@ -113,6 +118,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         this.workerId = workerId;
         this.logConfigManager = new LogConfigManager();
         this.metricRegistry = new StormMetricRegistry();
+        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
     }
 
     public static void main(String[] args) throws Exception {
@@ -125,8 +131,16 @@ public class Worker implements Shutdownable, DaemonCommon {
         Map<String, Object> conf = ConfigUtils.readStormConfig();
         Utils.setupDefaultUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
-        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
-                                   Integer.parseInt(portStr), workerId);
+        int supervisorPortInt = Integer.parseInt(supervisorPort);
+        Supplier<SupervisorIfaceFactory> supervisorIfaceSuppler = () -> {
+            try {
+                return SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPortInt);
+            } catch (UnknownHostException e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        };
+        Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt,
+                                   Integer.parseInt(portStr), workerId, supervisorIfaceSuppler);
         worker.start();
         int workerShutdownSleepSecs = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS));
         LOG.info("Adding shutdown hook with kill in {} secs", workerShutdownSleepSecs);
@@ -172,7 +186,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
                               Map<String, String> initCreds, Credentials initialCredentials)
         throws Exception {
-        workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId,
+        workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
                                       topologyConf, stateStorage, stormClusterState, autoCreds, metricRegistry);
 
         // Heartbeat here so that worker process dies if this fails
@@ -425,8 +439,8 @@ public class Worker implements Shutdownable, DaemonCommon {
         SupervisorWorkerHeartbeat workerHeartbeat = new SupervisorWorkerHeartbeat(lsWorkerHeartbeat.get_topology_id(),
                                                                                   lsWorkerHeartbeat.get_executors(),
                                                                                   lsWorkerHeartbeat.get_time_secs());
-        try (SupervisorClient client = SupervisorClient.getConfiguredClient(conf, Utils.hostname(), supervisorPort)) {
-            client.getClient().sendSupervisorWorkerHeartbeat(workerHeartbeat);
+        try (SupervisorIfaceFactory fac = supervisorIfaceSupplier.get()) {
+            fac.getIface().sendSupervisorWorkerHeartbeat(workerHeartbeat);
         } catch (Exception tr1) {
             //If any error/exception thrown, report directly to nimbus.
             LOG.warn("Exception when send heartbeat to local supervisor", tr1.getMessage());
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 2aa96a9..913261d 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -69,6 +69,7 @@ import org.apache.storm.serialization.ITupleSerializer;
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.shade.com.google.common.collect.Sets;
+import org.apache.storm.shade.org.apache.commons.lang.Validate;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
@@ -76,6 +77,7 @@ import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
+import org.apache.storm.utils.SupervisorIfaceFactory;
 import org.apache.storm.utils.ThriftTopologyUtils;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.utils.Utils.SmartThread;
@@ -92,7 +94,7 @@ public class WorkerState {
     final IConnection receiver;
     final String topologyId;
     final String assignmentId;
-    final int supervisorPort;
+    private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
     final int port;
     final String workerId;
     final IStateStorage stateStorage;
@@ -151,18 +153,18 @@ public class WorkerState {
     private final StormMetricRegistry metricRegistry;
 
     public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId,
-                       int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
+                       Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
                        IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials,
                        StormMetricRegistry metricRegistry) throws IOException,
         InvalidTopologyException {
         this.metricRegistry = metricRegistry;
         this.autoCredentials = autoCredentials;
         this.conf = conf;
+        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
         this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
         this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
         this.topologyId = topologyId;
         this.assignmentId = assignmentId;
-        this.supervisorPort = supervisorPort;
         this.port = port;
         this.workerId = workerId;
         this.stateStorage = stateStorage;
@@ -370,9 +372,14 @@ public class WorkerState {
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
-
+    
     public void refreshConnections() {
-        Assignment assignment = getLocalAssignment(conf, stormClusterState, topologyId);
+        Assignment assignment = null;
+        try {
+            assignment = getLocalAssignment(stormClusterState, topologyId);
+        } catch (Exception e) {
+            LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e);
+        }
 
         Set<NodeInfo> neededConnections = new HashSet<>();
         Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>();
@@ -393,6 +400,7 @@ public class WorkerState {
         Set<NodeInfo> newConnections = Sets.difference(neededConnections, currentConnections);
         Set<NodeInfo> removeConnections = Sets.difference(currentConnections, neededConnections);
 
+        Map<String, String> nodeHost = assignment != null ? assignment.get_node_host() : null;
         // Add new connections atomically
         cachedNodeToPortSocket.getAndUpdate(prev -> {
             Map<NodeInfo, IConnection> next = new HashMap<>(prev);
@@ -400,7 +408,8 @@ public class WorkerState {
                 next.put(nodeInfo,
                          mqContext.connect(
                              topologyId,
-                             assignment.get_node_host().get(nodeInfo.get_node()),    // Host
+                             //nodeHost is not null here, as newConnections is only non-empty if assignment was not null above.
+                             nodeHost.get(nodeInfo.get_node()),    // Host
                              nodeInfo.get_port().iterator().next().intValue(),       // Port
                              workerTransfer.getRemoteBackPressureStatus()));
             }
@@ -625,7 +634,8 @@ public class WorkerState {
         LOG.info("Reading assignments");
         List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
         executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
-        Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
+        Map<List<Long>, NodeInfo> executorToNodePort = 
+            getLocalAssignment(stormClusterState, topologyId).get_executor_node_port();
         for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
             NodeInfo nodeInfo = entry.getValue();
             if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
@@ -635,18 +645,17 @@ public class WorkerState {
         return executorsAssignedToThisWorker;
     }
 
-    private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) {
-        if (!ConfigUtils.isLocalMode(conf)) {
-            try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
-                                                                                          supervisorPort)) {
-                Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
-                return assignment;
-            } catch (Throwable tr1) {
+    private Assignment getLocalAssignment(IStormClusterState stormClusterState, String topologyId) {
+        try (SupervisorIfaceFactory fac = supervisorIfaceSupplier.get()) {
+            return fac.getIface().getLocalAssignmentForStorm(topologyId);
+        } catch (Throwable e) {
                 //if any error/exception thrown, fetch it from zookeeper
-                return stormClusterState.remoteAssignmentInfo(topologyId, null);
+            Assignment assignment = stormClusterState.remoteAssignmentInfo(topologyId, null);
+            if (assignment == null) {
+                throw new RuntimeException("Failed to read worker assignment."
+                    + " Supervisor client threw exception, and assignment in Zookeeper was null", e);
             }
-        } else {
-            return stormClusterState.remoteAssignmentInfo(topologyId, null);
+            return assignment;
         }
     }
 
@@ -666,7 +675,7 @@ public class WorkerState {
         for (List<Long> executor : executors) {
             int port = this.getPort();
             receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
-                recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
+                                                      recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
                 this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort(), metricRegistry));
 
         }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
index f321221..c225e27 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -152,4 +152,4 @@ public class WorkerTokenAuthorizer implements PasswordProvider, Closeable {
             state.disconnect();
         }
     }
-}
\ No newline at end of file
+}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java b/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java
index 2364777..64d5ace 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/SupervisorClient.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
  * <li>nimbus -> supervisor: assign assignments for a node.</li>
  * </ul>
  */
-public class SupervisorClient extends ThriftClient {
+public class SupervisorClient extends ThriftClient implements SupervisorIfaceFactory {
     private static final Logger LOG = LoggerFactory.getLogger(SupervisorClient.class);
     private Supervisor.Client client;
 
@@ -76,7 +76,8 @@ public class SupervisorClient extends ThriftClient {
         }
     }
 
-    public Supervisor.Client getClient() {
+    @Override
+    public Supervisor.Client getIface() {
         return client;
     }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/utils/SupervisorIfaceFactory.java b/storm-client/src/jvm/org/apache/storm/utils/SupervisorIfaceFactory.java
new file mode 100644
index 0000000..eddfa73
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/SupervisorIfaceFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.io.Closeable;
+
+public interface SupervisorIfaceFactory extends Closeable {
+
+    org.apache.storm.generated.Supervisor.Iface getIface();
+
+    @Override
+    default void close() {
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index 8ad5936..8b58483 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -309,7 +309,7 @@ public abstract class Container implements Killable {
         }
         return ret;
     }
-
+    
     @Override
     public boolean areAllProcessesDead() throws IOException {
         Set<Long> pids = getAllPids();
@@ -325,7 +325,7 @@ public abstract class Container implements Killable {
                 break;
             }
         }
-
+        
         if (allDead && shutdownTimer != null) {
             shutdownTimer.stop();
             shutdownTimer = null;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
index 7df2036..b310018 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ContainerLauncher.java
@@ -46,14 +46,17 @@ public abstract class ContainerLauncher {
      * @param sharedContext Used in local mode to let workers talk together without netty
      * @param metricsRegistry The metrics registry.
      * @param containerMemoryTracker The shared memory tracker for the supervisor's containers
+     * @param localSupervisor The local supervisor Thrift interface. Only used for local clusters, distributed clusters use Thrift directly.
      * @return the proper container launcher
      * @throws IOException on any error
      */
     public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, int supervisorPort,
                                          IContext sharedContext, StormMetricsRegistry metricsRegistry, 
-                                         ContainerMemoryTracker containerMemoryTracker) throws IOException {
+                                         ContainerMemoryTracker containerMemoryTracker,
+                                         org.apache.storm.generated.Supervisor.Iface localSupervisor) throws IOException {
         if (ConfigUtils.isLocalMode(conf)) {
-            return new LocalContainerLauncher(conf, supervisorId, supervisorPort, sharedContext, metricsRegistry, containerMemoryTracker);
+            return new LocalContainerLauncher(conf, supervisorId, supervisorPort, sharedContext, metricsRegistry, containerMemoryTracker,
+                localSupervisor);
         }
 
         ResourceIsolationInterface resourceIsolationManager = null;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
index 1a5fd82..228da84 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainer.java
@@ -27,15 +27,18 @@ import org.slf4j.LoggerFactory;
 public class LocalContainer extends Container {
     private static final Logger LOG = LoggerFactory.getLogger(LocalContainer.class);
     private final IContext _sharedContext;
+    private final org.apache.storm.generated.Supervisor.Iface localSupervisor;
     private volatile boolean _isAlive = false;
 
     public LocalContainer(Map<String, Object> conf, String supervisorId, int supervisorPort, int port,
                           LocalAssignment assignment, IContext sharedContext, StormMetricsRegistry metricsRegistry,
-                          ContainerMemoryTracker containerMemoryTracker) throws IOException {
+                          ContainerMemoryTracker containerMemoryTracker,
+                          org.apache.storm.generated.Supervisor.Iface localSupervisor) throws IOException {
         super(ContainerType.LAUNCH, conf, supervisorId, supervisorPort, port, assignment, null, null, null, null, metricsRegistry, 
             containerMemoryTracker);
         _sharedContext = sharedContext;
         _workerId = Utils.uuid();
+        this.localSupervisor = localSupervisor;
     }
 
     @Override
@@ -50,7 +53,10 @@ public class LocalContainer extends Container {
 
     @Override
     public void launch() throws IOException {
-        Worker worker = new Worker(_conf, _sharedContext, _topologyId, _supervisorId, _supervisorPort, _port, _workerId);
+        Worker worker = new Worker(_conf, _sharedContext, _topologyId, _supervisorId, _supervisorPort, _port, _workerId,
+            () -> {
+                return () -> localSupervisor;
+            });
         try {
             worker.start();
         } catch (Exception e) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
index 77b04d4..d9f3f8d 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/LocalContainerLauncher.java
@@ -29,22 +29,25 @@ public class LocalContainerLauncher extends ContainerLauncher {
     private final IContext _sharedContext;
     private final StormMetricsRegistry metricsRegistry;
     private final ContainerMemoryTracker containerMemoryTracker;
+    private final org.apache.storm.generated.Supervisor.Iface localSupervisor;
 
     public LocalContainerLauncher(Map<String, Object> conf, String supervisorId, int supervisorPort,
                                   IContext sharedContext, StormMetricsRegistry metricsRegistry, 
-                                  ContainerMemoryTracker containerMemoryTracker) {
+                                  ContainerMemoryTracker containerMemoryTracker,
+                                  org.apache.storm.generated.Supervisor.Iface localSupervisor) {
         _conf = conf;
         _supervisorId = supervisorId;
         _supervisorPort = supervisorPort;
         _sharedContext = sharedContext;
         this.metricsRegistry = metricsRegistry;
         this.containerMemoryTracker = containerMemoryTracker;
+        this.localSupervisor = localSupervisor;
     }
 
     @Override
     public Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {
         LocalContainer ret = new LocalContainer(_conf, _supervisorId, _supervisorPort,
-            port, assignment, _sharedContext, metricsRegistry, containerMemoryTracker);
+            port, assignment, _sharedContext, metricsRegistry, containerMemoryTracker, localSupervisor);
         ret.setup();
         ret.launch();
         return ret;
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 46b0e60..6b18bbe 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -86,7 +86,8 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         this.slotMetrics = supervisor.getSlotMetrics();
 
         this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisorPort,
-            supervisor.getSharedContext(), supervisor.getMetricsRegistry(), supervisor.getContainerMemoryTracker());
+            supervisor.getSharedContext(), supervisor.getMetricsRegistry(), supervisor.getContainerMemoryTracker(),
+            supervisor.getSupervisorThriftInterface());
 
         this.metricsProcessor = null;
         try {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index ed79c35..9bc4f7f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -112,6 +112,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private ThriftServer thriftServer;
     //used for local cluster heartbeating
     private Nimbus.Iface localNimbus;
+    //Passed to workers in local clusters, exposed by thrift server in distributed mode
+    private org.apache.storm.generated.Supervisor.Iface supervisorThriftInterface;
 
     private Supervisor(ISupervisor iSupervisor, StormMetricsRegistry metricsRegistry)
         throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException {
@@ -178,6 +180,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         this.workerHeartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
 
         this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+        
+        this.supervisorThriftInterface = createSupervisorIface();
     }
 
     /**
@@ -393,6 +397,59 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         }
     }
 
+    private org.apache.storm.generated.Supervisor.Iface createSupervisorIface() {
+        return new org.apache.storm.generated.Supervisor.Iface() {
+            @Override
+            public void sendSupervisorAssignments(SupervisorAssignments assignments)
+                throws AuthorizationException, TException {
+                checkAuthorization("sendSupervisorAssignments");
+                LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments);
+                SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments,
+                    getReadClusterState());
+                getEventManger().add(syn);
+            }
+
+            @Override
+            public Assignment getLocalAssignmentForStorm(String id)
+                throws NotAliveException, AuthorizationException, TException {
+                Map<String, Object> topoConf = null;
+                try {
+                    topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+                } catch (IOException e) {
+                    LOG.warn("Topology config is not localized yet...");
+                }
+                checkAuthorization(id, topoConf, "getLocalAssignmentForStorm");
+                Assignment assignment = getStormClusterState().assignmentInfo(id, null);
+                if (null == assignment) {
+                    throw new WrappedNotAliveException("No local assignment assigned for storm: "
+                        + id
+                        + " for node: "
+                        + getHostName());
+                }
+                return assignment;
+            }
+
+            @Override
+            public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
+                throws AuthorizationException, NotAliveException, TException {
+                // do nothing except validate heartbeat for now.
+                String id = heartbeat.get_storm_id();
+                Map<String, Object> topoConf = null;
+                try {
+                    topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
+                } catch (IOException e) {
+                    LOG.warn("Topology config is not localized yet...");
+                    throw new WrappedNotAliveException(id + " does not appear to be alive, you should probably exit");
+                }
+                checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat");
+            }
+        };
+    }
+
+    public org.apache.storm.generated.Supervisor.Iface getSupervisorThriftInterface() {
+        return supervisorThriftInterface;
+    }
+    
     private void launchSupervisorThriftServer(Map<String, Object> conf) throws IOException {
         // validate port
         int port = getThriftServerPort();
@@ -404,53 +461,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
             throw new RuntimeException(e);
         }
 
-        TProcessor processor = new org.apache.storm.generated.Supervisor.Processor(
-            new org.apache.storm.generated.Supervisor.Iface() {
-                @Override
-                public void sendSupervisorAssignments(SupervisorAssignments assignments)
-                    throws AuthorizationException, TException {
-                    checkAuthorization("sendSupervisorAssignments");
-                    LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments);
-                    SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments,
-                                                                            getReadClusterState());
-                    getEventManger().add(syn);
-                }
-
-                @Override
-                public Assignment getLocalAssignmentForStorm(String id)
-                    throws NotAliveException, AuthorizationException, TException {
-                    Map<String, Object> topoConf = null;
-                    try {
-                        topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
-                    } catch (IOException e) {
-                        LOG.warn("Topology config is not localized yet...");
-                    }
-                    checkAuthorization(id, topoConf, "getLocalAssignmentForStorm");
-                    Assignment assignment = getStormClusterState().assignmentInfo(id, null);
-                    if (null == assignment) {
-                        throw new WrappedNotAliveException("No local assignment assigned for storm: "
-                                                    + id
-                                                    + " for node: "
-                                                    + getHostName());
-                    }
-                    return assignment;
-                }
-
-                @Override
-                public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat)
-                    throws AuthorizationException, NotAliveException, TException {
-                    // do nothing except validate heartbeat for now.
-                    String id = heartbeat.get_storm_id();
-                    Map<String, Object> topoConf = null;
-                    try {
-                        topoConf = ConfigUtils.readSupervisorStormConf(conf, id);
-                    } catch (IOException e) {
-                        LOG.warn("Topology config is not localized yet...");
-                        throw new WrappedNotAliveException(id + " does not appear to be alive, you should probably exit");
-                    }
-                    checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat");
-                }
-            });
+        TProcessor processor = new org.apache.storm.generated.Supervisor.Processor<>(supervisorThriftInterface);
         this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR);
         this.thriftServer.serve();
     }
@@ -536,7 +547,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         } else {
             try {
                 ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getThriftServerPort(),
-                                                                    getSharedContext(), getMetricsRegistry(), getContainerMemoryTracker());
+                                                                    getSharedContext(), getMetricsRegistry(), getContainerMemoryTracker(),
+                                                                    supervisorThriftInterface);
                 killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher);
             } catch (Exception e) {
                 throw Utils.wrapInRuntime(e);
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index b912513..05cb1df 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -288,7 +288,7 @@ public class AssignmentDistributionService implements Closeable {
                 try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(),
                                                                                     assignments.getHost(), assignments.getServerPort())) {
                     try {
-                        client.getClient().sendSupervisorAssignments(assignments.getAssignments());
+                        client.getIface().sendSupervisorAssignments(assignments.getAssignments());
                     } catch (Exception e) {
                         //just ignore the exception.
                         LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());


[storm] 03/04: Merge branch 'STORM-3376' of https://github.com/srdo/storm into asfgit-master

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit 2bf972e050af2356940f2b9ce59af6d660f49766
Merge: 8951e7a aaf1113
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Sun May 5 16:12:55 2019 +0200

    Merge branch 'STORM-3376' of https://github.com/srdo/storm into asfgit-master

 integration-test/config/storm.yaml                 |  6 ++
 .../test/org/apache/storm/sql/TestStormSql.java    | 45 +++++-----
 .../backends/streams/TestPlanCompiler.java         | 96 ++++++++++-----------
 .../src/test/org/apache/storm/sql/TestUtils.java   | 23 ++---
 .../org/apache/storm/cluster/IStateStorage.java    |  4 +-
 .../storm/cluster/StormClusterStateImpl.java       |  8 +-
 .../jvm/org/apache/storm/daemon/worker/Worker.java |  4 +-
 .../apache/storm/daemon/worker/WorkerState.java    | 40 ++++-----
 .../org/apache/storm/messaging/IConnection.java    | 14 ----
 .../jvm/org/apache/storm/messaging/IContext.java   |  5 +-
 .../org/apache/storm/messaging/local/Context.java  | 98 ++++++++++------------
 .../org/apache/storm/messaging/netty/Client.java   | 15 ----
 .../org/apache/storm/messaging/netty/Context.java  |  6 +-
 .../org/apache/storm/messaging/netty/Server.java   | 29 +++----
 .../apache/storm/security/auth/ThriftServer.java   |  8 +-
 .../auth/digest/DigestSaslTransportPlugin.java     | 11 ++-
 .../auth/kerberos/KerberosSaslTransportPlugin.java | 11 ++-
 .../security/auth/sasl/SaslTransportPlugin.java    | 11 ++-
 .../auth/workertoken/WorkerTokenAuthorizer.java    | 13 ++-
 .../apache/storm/messaging/netty/NettyTest.java    | 28 +++----
 .../main/java/org/apache/storm/LocalCluster.java   |  6 +-
 .../apache/storm/blobstore/LocalFsBlobStore.java   |  1 +
 .../org/apache/storm/daemon/supervisor/Slot.java   |  2 +-
 .../org/apache/storm/localizer/AsyncLocalizer.java |  4 +-
 .../security/auth/workertoken/WorkerTokenTest.java | 24 +++---
 25 files changed, 247 insertions(+), 265 deletions(-)


[storm] 04/04: Merge branch 'STORM-3379' of https://github.com/srdo/storm into asfgit-master

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git

commit 26e6c06c4b94aa121457889b1bd22c74fa41b3f7
Merge: 2bf972e c985695
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Sun May 5 16:13:19 2019 +0200

    Merge branch 'STORM-3379' of https://github.com/srdo/storm into asfgit-master

 .../jvm/org/apache/storm/daemon/worker/Worker.java |  26 +++--
 .../apache/storm/daemon/worker/WorkerState.java    |  45 +++++----
 .../auth/workertoken/WorkerTokenAuthorizer.java    |   2 +-
 .../org/apache/storm/utils/SupervisorClient.java   |   5 +-
 .../apache/storm/utils/SupervisorIfaceFactory.java |  30 ++++++
 .../apache/storm/daemon/supervisor/Container.java  |   4 +-
 .../storm/daemon/supervisor/ContainerLauncher.java |   7 +-
 .../storm/daemon/supervisor/LocalContainer.java    |  10 +-
 .../daemon/supervisor/LocalContainerLauncher.java  |   7 +-
 .../storm/daemon/supervisor/ReadClusterState.java  |   3 +-
 .../apache/storm/daemon/supervisor/Supervisor.java | 108 ++++++++++++---------
 .../nimbus/AssignmentDistributionService.java      |   2 +-
 12 files changed, 164 insertions(+), 85 deletions(-)