You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/05/05 14:14:08 UTC
[storm] 01/04: STORM-3376: Set Server callback before opening Netty
socket, so we don't drop messages during startup
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
commit aaf1113360ce151d86948860c2f290befa79abb8
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Fri Apr 12 20:25:42 2019 +0200
STORM-3376: Set Server callback before opening Netty socket, so we don't drop messages during startup
---
integration-test/config/storm.yaml | 6 ++
.../test/org/apache/storm/sql/TestStormSql.java | 45 +++++-----
.../backends/streams/TestPlanCompiler.java | 96 ++++++++++-----------
.../src/test/org/apache/storm/sql/TestUtils.java | 23 ++---
.../org/apache/storm/cluster/IStateStorage.java | 4 +-
.../storm/cluster/StormClusterStateImpl.java | 8 +-
.../jvm/org/apache/storm/daemon/worker/Worker.java | 4 +-
.../apache/storm/daemon/worker/WorkerState.java | 40 ++++-----
.../org/apache/storm/messaging/IConnection.java | 14 ----
.../jvm/org/apache/storm/messaging/IContext.java | 5 +-
.../org/apache/storm/messaging/local/Context.java | 98 ++++++++++------------
.../org/apache/storm/messaging/netty/Client.java | 15 ----
.../org/apache/storm/messaging/netty/Context.java | 6 +-
.../org/apache/storm/messaging/netty/Server.java | 29 +++----
.../apache/storm/security/auth/ThriftServer.java | 8 +-
.../auth/digest/DigestSaslTransportPlugin.java | 11 ++-
.../auth/kerberos/KerberosSaslTransportPlugin.java | 11 ++-
.../security/auth/sasl/SaslTransportPlugin.java | 11 ++-
.../auth/workertoken/WorkerTokenAuthorizer.java | 13 ++-
.../apache/storm/messaging/netty/NettyTest.java | 28 +++----
.../main/java/org/apache/storm/LocalCluster.java | 6 +-
.../apache/storm/blobstore/LocalFsBlobStore.java | 1 +
.../org/apache/storm/daemon/supervisor/Slot.java | 2 +-
.../org/apache/storm/localizer/AsyncLocalizer.java | 4 +-
.../security/auth/workertoken/WorkerTokenTest.java | 24 +++---
25 files changed, 247 insertions(+), 265 deletions(-)
diff --git a/integration-test/config/storm.yaml b/integration-test/config/storm.yaml
index 67784e7..774497f 100644
--- a/integration-test/config/storm.yaml
+++ b/integration-test/config/storm.yaml
@@ -35,3 +35,9 @@ drpc.servers:
- "node1"
supervisor.slots.ports: [6700, 6701, 6702, 6703, 6704, 6705, 6706, 6707, 6708, 6709]
+
+# Enable assertions
+nimbus.childopts: "-Xmx1024m -ea"
+supervisor.childopts: "-Xmx256m -ea"
+worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump -ea"
+ui.childopts: "-Xmx768m -ea"
\ No newline at end of file
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 5a30764..3958894 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -29,28 +29,24 @@ import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.streams.Pair;
import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
-
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(TestUtils.MockBoltExtension.class)
+@ExtendWith(TestUtils.MockInsertBoltExtension.class)
public class TestStormSql {
public static final int WAIT_TIMEOUT_MS = 1000 * 1000;
public static final int WAIT_TIMEOUT_MS_NO_RECORDS_EXPECTED = 1000 * 10;
public static final int WAIT_TIMEOUT_MS_ERROR_EXPECTED = 1000;
- @Rule
- public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
-
- @Rule
- public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
-
private static LocalCluster cluster;
- @BeforeClass
+ @BeforeAll
public static void staticSetup() throws Exception {
DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
DataSourcesRegistry.providerMap().put("mocknested", new MockNestedDataSourceProvider());
@@ -61,7 +57,7 @@ public class TestStormSql {
cluster = new LocalCluster();
}
- @AfterClass
+ @AfterAll
public static void staticCleanup() {
DataSourcesRegistry.providerMap().remove("mock");
DataSourcesRegistry.providerMap().remove("mocknested");
@@ -176,7 +172,7 @@ public class TestStormSql {
Assert.assertEquals(0, values.size());
}
- @Test(expected = ValidationException.class)
+ @Test
public void testExternalUdfType() throws Exception {
List<String> stmt = new ArrayList<>();
stmt.add("CREATE EXTERNAL TABLE FOO (ID INT, NAME VARCHAR) LOCATION 'mock:///foo'");
@@ -185,12 +181,11 @@ public class TestStormSql {
stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(NAME, 1) FROM FOO WHERE ID = 0");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
- Assert.fail("Should raise ValidationException.");
+ Assertions.assertThrows(ValidationException.class,
+ () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
}
- @Test(expected = CompilingClassLoader.CompilerException.class)
+ @Test
public void testExternalUdfType2() throws Exception {
List<String> stmt = new ArrayList<>();
// generated code will be not compilable since return type of MYPLUS and type of 'x' are different
@@ -200,9 +195,8 @@ public class TestStormSql {
stmt.add("INSERT INTO BAR SELECT STREAM ID FROM FOO WHERE MYPLUS(ID, 1) = 'x'");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
- Assert.fail("Should raise CompilerException.");
+ Assertions.assertThrows(CompilingClassLoader.CompilerException.class,
+ () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
}
@Test
@@ -222,7 +216,7 @@ public class TestStormSql {
Assert.assertEquals(5, values.get(1).getFirst());
}
- @Test(expected = UnsupportedOperationException.class)
+ @Test
public void testExternalUdfUsingJar() throws Exception {
List<String> stmt = new ArrayList<>();
stmt.add("CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'mock:///foo'");
@@ -231,9 +225,8 @@ public class TestStormSql {
stmt.add("INSERT INTO BAR SELECT STREAM MYPLUS(ID, 1) FROM FOO WHERE ID > 2");
StormSqlLocalClusterImpl impl = new StormSqlLocalClusterImpl();
- impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED);
-
- Assert.fail("Should raise UnsupportedOperationException.");
+ Assertions.assertThrows(UnsupportedOperationException.class,
+ () -> impl.runLocal(cluster, stmt, (__) -> true, WAIT_TIMEOUT_MS_ERROR_EXPECTED));
}
private static class MockDataSourceProvider implements DataSourcesProvider {
diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
index 68203d8..21bc6f3 100644
--- a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
+++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/streams/TestPlanCompiler.java
@@ -33,35 +33,31 @@ import org.apache.storm.sql.planner.streams.QueryPlanner;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.streams.Pair;
import org.apache.storm.tuple.Values;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExternalResource;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import org.junit.Assert;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+@ExtendWith(TestUtils.MockBoltExtension.class)
+@ExtendWith(TestUtils.MockInsertBoltExtension.class)
public class TestPlanCompiler {
- private static LocalCluster cluster;
- @Rule
- public ExternalResource mockBoltValues = TestUtils.mockBoltValueResource;
-
- @Rule
- public ExternalResource mockInsertBoltValues = TestUtils.mockInsertBoltValueResource;
+ private static LocalCluster cluster;
- @BeforeClass
+ @BeforeAll
public static void staticSetup() throws Exception {
cluster = new LocalCluster();
}
- @AfterClass
+ @AfterAll
public static void staticCleanup() {
- if (cluster!= null) {
+ if (cluster != null) {
cluster.shutdown();
cluster = null;
}
@@ -82,7 +78,7 @@ public class TestPlanCompiler {
final StormTopology topo = proc.build();
SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
- Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
+ Assert.assertArrayEquals(new Values[]{new Values(3), new Values(4)}, TestUtils.MockBolt.getCollectedValues().toArray());
}
@Test
@@ -99,15 +95,15 @@ public class TestPlanCompiler {
final StormTopology topo = proc.build();
SqlTestUtil.runStormTopology(cluster, TestUtils.MockInsertBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
- Assert.assertArrayEquals(new Pair[] { Pair.of(4, new Values(4, "abcde", "y")) }, TestUtils.MockInsertBolt.getCollectedValues().toArray());
+ Assert.assertArrayEquals(new Pair[]{Pair.of(4, new Values(4, "abcde", "y"))}, TestUtils.MockInsertBolt.getCollectedValues().toArray());
}
@Test
public void testUdf() throws Exception {
int EXPECTED_VALUE_SIZE = 1;
- String sql = "SELECT MYPLUS(ID, 3)" +
- "FROM FOO " +
- "WHERE ID = 2";
+ String sql = "SELECT MYPLUS(ID, 3)"
+ + "FROM FOO "
+ + "WHERE ID = 2";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
Map<String, ISqlStreamsDataSource> data = new HashMap<>();
data.put("FOO", new TestUtils.MockSqlStreamsDataSource());
@@ -119,15 +115,15 @@ public class TestPlanCompiler {
final StormTopology topo = proc.build();
SqlTestUtil.runStormTopology(cluster, TestUtils.MockBolt.getCollectedValues(), EXPECTED_VALUE_SIZE, proc, topo);
- Assert.assertArrayEquals(new Values[] { new Values(5) }, TestUtils.MockBolt.getCollectedValues().toArray());
+ Assert.assertArrayEquals(new Values[]{new Values(5)}, TestUtils.MockBolt.getCollectedValues().toArray());
}
@Test
public void testNested() throws Exception {
int EXPECTED_VALUE_SIZE = 1;
- String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
- "FROM FOO " +
- "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+ String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD "
+ + "FROM FOO "
+ + "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql);
final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -144,23 +140,23 @@ public class TestPlanCompiler {
Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))},
- TestUtils.MockBolt.getCollectedValues().toArray());
+ TestUtils.MockBolt.getCollectedValues().toArray());
}
/**
- * All the binary literal tests are done here, because Avatica converts the result to byte[]
- * whereas Stream provides the result to ByteString which makes different semantic from Stream implementation.
+ * All the binary literal tests are done here, because Avatica converts the result to byte[] whereas Stream provides the result to
+ * ByteString which makes different semantic from Stream implementation.
*/
@Test
public void testBinaryStringFunctions() throws Exception {
int EXPECTED_VALUE_SIZE = 1;
- String sql = "SELECT x'45F0AB' || x'45F0AB', " +
- "POSITION(x'F0' IN x'453423F0ABBC'), " +
- "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), " +
- "SUBSTRING(x'453423F0ABBC' FROM 3), " +
- "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) " +
- "FROM FOO " +
- "WHERE ID > 0 AND ID < 2";
+ String sql = "SELECT x'45F0AB' || x'45F0AB', "
+ + "POSITION(x'F0' IN x'453423F0ABBC'), "
+ + "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), "
+ + "SUBSTRING(x'453423F0ABBC' FROM 3), "
+ + "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) "
+ + "FROM FOO "
+ + "WHERE ID > 0 AND ID < 2";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -190,19 +186,19 @@ public class TestPlanCompiler {
@Test
public void testDateKeywordsAndFunctions() throws Exception {
int EXPECTED_VALUE_SIZE = 1;
- String sql = "SELECT " +
- "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, " +
- "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, " +
- "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')," +
- "FLOOR(DATE '2016-01-23' TO MONTH)," +
- "CEIL(TIME '12:34:56' TO MINUTE)," +
- "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP," +
- "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}," +
- "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}," +
- "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, " +
- "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field " +
- "FROM FOO " +
- "WHERE ID > 0 AND ID < 2";
+ String sql = "SELECT "
+ + "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, "
+ + "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, "
+ + "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56'),"
+ + "FLOOR(DATE '2016-01-23' TO MONTH),"
+ + "CEIL(TIME '12:34:56' TO MINUTE),"
+ + "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP,"
+ + "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')},"
+ + "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')},"
+ + "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, "
+ + "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field "
+ + "FROM FOO "
+ + "WHERE ID > 0 AND ID < 2";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
final Map<String, ISqlStreamsDataSource> data = new HashMap<>();
@@ -229,7 +225,7 @@ public class TestPlanCompiler {
int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY);
Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt,
- 134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
- TestUtils.MockBolt.getCollectedValues().toArray());
+ 134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)},
+ TestUtils.MockBolt.getCollectedValues().toArray());
}
-}
\ No newline at end of file
+}
diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index f728bf6..a1aa552 100644
--- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -39,22 +39,23 @@ import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
-import org.junit.rules.ExternalResource;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
public class TestUtils {
- public static final ExternalResource mockInsertBoltValueResource = new ExternalResource() {
- @Override
- protected void before() throws Throwable {
- MockInsertBolt.getCollectedValues().clear();
+ public static final class MockInsertBoltExtension implements BeforeEachCallback {
+ @Override
+ public void beforeEach(ExtensionContext ctx) throws Exception {
+ MockInsertBolt.getCollectedValues().clear();
+ }
}
- };
- public static final ExternalResource mockBoltValueResource = new ExternalResource() {
- @Override
- protected void before() throws Throwable {
- MockBolt.getCollectedValues().clear();
+ public static final class MockBoltExtension implements BeforeEachCallback {
+ @Override
+ public void beforeEach(ExtensionContext arg0) throws Exception {
+ MockBolt.getCollectedValues().clear();
+ }
}
- };
public static class MyPlus {
public static Integer evaluate(Integer x, Integer y) {
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
index 0889a26..b673932 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -12,6 +12,7 @@
package org.apache.storm.cluster;
+import java.io.Closeable;
import java.util.List;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
@@ -27,7 +28,7 @@ import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
* For example, performing these two calls: set_data("/path", data, acls); void set_worker_hb("/path", heartbeat, acls); may or may not
* cause a collision in "/path". Never use the same paths with the *_hb* methods as you do with the others.
*/
-public interface IStateStorage {
+public interface IStateStorage extends Closeable {
/**
* Registers a callback function that gets called when CuratorEvents happen.
@@ -115,6 +116,7 @@ public interface IStateStorage {
/**
* Close the connection to the data store.
*/
+ @Override
void close();
/**
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 644f465..f330278 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -56,7 +56,7 @@ public class StormClusterStateImpl implements IStormClusterState {
private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
private final List<ACL> defaultAcls;
private final String stateId;
- private final boolean solo;
+ private final boolean shouldCloseStateStorageOnDisconnect;
private final ClusterStateContext context;
private IStateStorage stateStorage;
private ILocalAssignmentsBackend assignmentsBackend;
@@ -74,10 +74,10 @@ public class StormClusterStateImpl implements IStormClusterState {
private ConcurrentHashMap<String, Runnable> logConfigCallback;
public StormClusterStateImpl(IStateStorage StateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend,
- ClusterStateContext context, boolean solo) throws Exception {
+ ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception {
this.stateStorage = StateStorage;
- this.solo = solo;
+ this.shouldCloseStateStorageOnDisconnect = shouldCloseStateStorageOnDisconnect;
this.defaultAcls = context.getDefaultZkAcls();
this.context = context;
this.assignmentsBackend = assignmentsassignmentsBackend;
@@ -831,7 +831,7 @@ public class StormClusterStateImpl implements IStormClusterState {
@Override
public void disconnect() {
stateStorage.unregister(stateId);
- if (solo) {
+ if (shouldCloseStateStorageOnDisconnect) {
stateStorage.close();
this.assignmentsBackend.close();
}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 05a79c6..e950c051 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -197,9 +197,7 @@ public class Worker implements Shutdownable, DaemonCommon {
.scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
- workerState.registerCallbacks();
-
- workerState.refreshConnections(null);
+ workerState.refreshConnections();
workerState.activateWorkerWhenAllConnectionsReady();
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 0c13bec..2aa96a9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.StormTimer;
@@ -57,6 +58,7 @@ import org.apache.storm.hooks.IWorkerHook;
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.DeserializingConnectionCallback;
import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.TransportFactory;
import org.apache.storm.messaging.netty.BackPressureStatus;
@@ -158,7 +160,6 @@ public class WorkerState {
this.conf = conf;
this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf);
- this.receiver = this.mqContext.bind(topologyId, port);
this.topologyId = topologyId;
this.assignmentId = assignmentId;
this.supervisorPort = supervisorPort;
@@ -215,6 +216,16 @@ public class WorkerState {
this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);
this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue);
this.deserializedWorkerHooks = deserializeWorkerHooks();
+ LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
+ IConnectionCallback cb = new DeserializingConnectionCallback(topologyConf,
+ getWorkerTopologyContext(),
+ this::transferLocalBatch);
+ Supplier<Object> newConnectionResponse = () -> {
+ BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+ LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
+ return bpStatus;
+ };
+ this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse);
}
private static double getQueueLoad(JCQueue q) {
@@ -356,19 +367,11 @@ public class WorkerState {
return userTimer;
}
- public void refreshConnections() {
- try {
- refreshConnections(() -> refreshConnectionsTimer.schedule(0, this::refreshConnections));
- } catch (Exception e) {
- throw Utils.wrapInRuntime(e);
- }
- }
-
public SmartThread makeTransferThread() {
return workerTransfer.makeTransferThread();
}
- public void refreshConnections(Runnable callback) throws Exception {
+ public void refreshConnections() {
Assignment assignment = getLocalAssignment(conf, stormClusterState, topologyId);
Set<NodeInfo> neededConnections = new HashSet<>();
@@ -497,21 +500,6 @@ public class WorkerState {
);
}
- public void registerCallbacks() {
- LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
- receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
- getWorkerTopologyContext(),
- this::transferLocalBatch));
- // Send curr BackPressure status to new clients
- receiver.registerNewConnectionResponse(
- () -> {
- BackPressureStatus bpStatus = bpTracker.getCurrStatus();
- LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
- return bpStatus;
- }
- );
- }
-
/* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */
public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
@@ -576,7 +564,7 @@ public class WorkerState {
}
}
- public WorkerTopologyContext getWorkerTopologyContext() {
+ public final WorkerTopologyContext getWorkerTopologyContext() {
try {
String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, topologyId));
String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId);
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
index c2e156c..f713c7f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -22,20 +22,6 @@ import org.apache.storm.messaging.netty.BackPressureStatus;
public interface IConnection extends AutoCloseable {
/**
- * Register a callback to be notified when data is ready to be processed.
- *
- * @param cb the callback to process the messages.
- */
- void registerRecv(IConnectionCallback cb);
-
- /**
- * Register a response generator to be used to send an initial response when a new client connects.
- *
- * @param cb the callback to process the connection.
- */
- void registerNewConnectionResponse(Supplier<Object> cb);
-
- /**
* Send load metrics to all downstream connections.
*
* @param taskToLoad a map from the task id to the load for that task.
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index 8d2c0dc..057ae30 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -14,6 +14,7 @@ package org.apache.storm.messaging;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
/**
* This interface needs to be implemented for messaging plugin.
@@ -41,9 +42,11 @@ public interface IContext {
*
* @param storm_id topology ID
* @param port port #
+ * @param cb The callback to deliver received messages to
+ * @param newConnectionResponse Supplier of the initial message to send to new client connections
* @return server side connection
*/
- IConnection bind(String storm_id, int port);
+ IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse);
/**
* This method establish a client side connection to a remote server
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
index 6071cbe..2737dfb 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -15,6 +15,7 @@ package org.apache.storm.messaging.local;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -37,36 +38,36 @@ import org.slf4j.LoggerFactory;
public class Context implements IContext {
private static final Logger LOG = LoggerFactory.getLogger(Context.class);
- private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
-
- private static LocalServer getLocalServer(String nodeId, int port) {
- String key = nodeId + "-" + port;
- LocalServer ret = _registry.get(key);
- if (ret == null) {
- ret = new LocalServer(port);
- LocalServer tmp = _registry.putIfAbsent(key, ret);
- if (tmp != null) {
- ret = tmp;
- }
+ private final ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
+
+ private static String getNodeKey(String nodeId, int port) {
+ return nodeId + "-" + port;
+ }
+
+ private LocalServer createLocalServer(String nodeId, int port, IConnectionCallback cb) {
+ String key = getNodeKey(nodeId, port);
+ LocalServer ret = new LocalServer(port, cb);
+ LocalServer existing = _registry.put(key, ret);
+ if (existing != null) {
+ //Can happen if worker is restarted in the same topology, e.g. due to blob update
+ LOG.info("Replacing existing server for key {}", existing, ret, key);
}
return ret;
}
- ;
-
@Override
public void prepare(Map<String, Object> topoConf) {
//NOOP
}
@Override
- public IConnection bind(String storm_id, int port) {
- return getLocalServer(storm_id, port);
+ public IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
+ return createLocalServer(storm_id, port, cb);
}
@Override
public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
- return new LocalClient(getLocalServer(storm_id, port));
+ return new LocalClient(storm_id, port);
}
@Override
@@ -74,25 +75,16 @@ public class Context implements IContext {
//NOOP
}
- private static class LocalServer implements IConnection {
+ private class LocalServer implements IConnection {
final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
final int port;
- volatile IConnectionCallback _cb;
+ final IConnectionCallback _cb;
- public LocalServer(int port) {
+ public LocalServer(int port, IConnectionCallback cb) {
this.port = port;
+ this._cb = cb;
}
-
- @Override
- public void registerRecv(IConnectionCallback cb) {
- _cb = cb;
- }
-
- @Override
- public void registerNewConnectionResponse(Supplier<Object> cb) {
- return;
- }
-
+
@Override
public void send(Iterator<TaskMessage> msgs) {
throw new IllegalArgumentException("SHOULD NOT HAPPEN");
@@ -131,14 +123,16 @@ public class Context implements IContext {
}
}
- private static class LocalClient implements IConnection {
- private final LocalServer _server;
+ private class LocalClient implements IConnection {
//Messages sent before the server registered a callback
private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
private final ScheduledExecutorService _pendingFlusher;
+ private final int port;
+ private final String registryKey;
- public LocalClient(LocalServer server) {
- _server = server;
+ public LocalClient(String stormId, int port) {
+ this.port = port;
+ this.registryKey = getNodeKey(stormId, port);
_pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
_pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
@@ -163,35 +157,26 @@ public class Context implements IContext {
}, 5, 5, TimeUnit.SECONDS);
}
- @Override
- public void registerRecv(IConnectionCallback cb) {
- throw new IllegalArgumentException("SHOULD NOT HAPPEN");
- }
-
- @Override
- public void registerNewConnectionResponse(Supplier<Object> cb) {
- throw new IllegalArgumentException("SHOULD NOT HAPPEN");
- }
-
private void flushPending() {
- IConnectionCallback serverCb = _server._cb;
- if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
+ //Can't cache server in client, server can change when workers restart.
+ LocalServer server = _registry.get(registryKey);
+ if (server != null && !_pendingDueToUnregisteredServer.isEmpty()) {
ArrayList<TaskMessage> ret = new ArrayList<>();
_pendingDueToUnregisteredServer.drainTo(ret);
- serverCb.recv(ret);
+ server._cb.recv(ret);
}
}
@Override
public void send(Iterator<TaskMessage> msgs) {
- IConnectionCallback serverCb = _server._cb;
- if (serverCb != null) {
+ LocalServer server = _registry.get(registryKey);
+ if (server != null) {
flushPending();
ArrayList<TaskMessage> ret = new ArrayList<>();
while (msgs.hasNext()) {
ret.add(msgs.next());
}
- serverCb.recv(ret);
+ server._cb.recv(ret);
} else {
while (msgs.hasNext()) {
_pendingDueToUnregisteredServer.add(msgs.next());
@@ -201,12 +186,19 @@ public class Context implements IContext {
@Override
public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
- return _server.getLoad(tasks);
+ LocalServer server = _registry.get(registryKey);
+ if (server != null) {
+ return server.getLoad(tasks);
+ }
+ return Collections.emptyMap();
}
@Override
public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
- _server.sendLoadMetrics(taskToLoad);
+ LocalServer server = _registry.get(registryKey);
+ if (server != null) {
+ server.sendLoadMetrics(taskToLoad);
+ }
}
@Override
@@ -216,7 +208,7 @@ public class Context implements IContext {
@Override
public int getPort() {
- return _server.getPort();
+ return port;
}
@Override
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index fe2fe16..61a9c99 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -236,21 +236,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
}
}
- /**
- * Receiving messages is not supported by a client.
- *
- * @throws java.lang.UnsupportedOperationException whenever this method is being called.
- */
- @Override
- public void registerRecv(IConnectionCallback cb) {
- throw new UnsupportedOperationException("Client connection should not receive any messages");
- }
-
- @Override
- public void registerNewConnectionResponse(Supplier<Object> cb) {
- throw new UnsupportedOperationException("Client does not accept new connections");
- }
-
@Override
public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
throw new RuntimeException("Client connection should not send load metrics");
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 27ccd04..ca46c4f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -17,8 +17,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IConnectionCallback;
import org.apache.storm.messaging.IContext;
import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
@@ -53,8 +55,8 @@ public class Context implements IContext {
* establish a server with a binding port
*/
@Override
- public synchronized IConnection bind(String storm_id, int port) {
- Server server = new Server(topoConf, port);
+ public synchronized IConnection bind(String storm_id, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
+ Server server = new Server(topoConf, port, cb, newConnectionResponse);
serverConnections.add(server);
return server;
}
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 7d150c3..a3cd8b0 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -62,14 +62,23 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
private final int port;
private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
private final KryoValuesSerializer ser;
+ private final IConnectionCallback cb;
+ private final Supplier<Object> newConnectionResponse;
private volatile boolean closing = false;
- private IConnectionCallback cb = null;
- private Supplier<Object> newConnectionResponse;
- Server(Map<String, Object> topoConf, int port) {
+ /**
+ * Starts Netty at the given port
+ * @param topoConf The topology config
+ * @param port The port to start Netty at
+ * @param cb The callback to deliver incoming messages to
+ * @param newConnectionResponse The response to send to clients when they connect. Can be null.
+ */
+ Server(Map<String, Object> topoConf, int port, IConnectionCallback cb, Supplier<Object> newConnectionResponse) {
this.topoConf = topoConf;
this.port = port;
ser = new KryoValuesSerializer(topoConf);
+ this.cb = cb;
+ this.newConnectionResponse = newConnectionResponse;
// Configure the server.
int buffer_size = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -136,19 +145,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
return;
}
addReceiveCount(from, msgs.size());
- if (cb != null) {
- cb.recv(msgs);
- }
- }
-
- @Override
- public void registerRecv(IConnectionCallback cb) {
- this.cb = cb;
- }
-
- @Override
- public void registerNewConnectionResponse(Supplier<Object> newConnectionResponse) {
- this.newConnectionResponse = newConnectionResponse;
+ cb.recv(msgs);
}
@Override
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index eaeb17a..e83d37d 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -12,9 +12,11 @@
package org.apache.storm.security.auth;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import javax.security.auth.login.Configuration;
+import org.apache.storm.security.auth.sasl.SaslTransportPlugin;
import org.apache.storm.thrift.TProcessor;
import org.apache.storm.thrift.server.TServer;
import org.apache.storm.thrift.transport.TTransportException;
@@ -30,6 +32,7 @@ public class ThriftServer {
private Configuration loginConf;
private int port;
private boolean areWorkerTokensSupported;
+ private ITransportPlugin transportPlugin;
public ThriftServer(Map<String, Object> conf, TProcessor processor, ThriftConnectionType type) {
this.conf = conf;
@@ -44,7 +47,7 @@ public class ThriftServer {
}
try {
//locate our thrift transport plugin
- ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
+ transportPlugin = ClientAuthUtils.getTransportPlugin(this.type, this.conf, loginConf);
//server
server = transportPlugin.getServer(this.processor);
port = transportPlugin.getPort();
@@ -57,6 +60,9 @@ public class ThriftServer {
public void stop() {
server.stop();
+ if (transportPlugin instanceof SaslTransportPlugin) {
+ ((SaslTransportPlugin)transportPlugin).close();
+ }
}
/**
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
index 54731ee..463b841 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/digest/DigestSaslTransportPlugin.java
@@ -34,11 +34,15 @@ import org.slf4j.LoggerFactory;
public class DigestSaslTransportPlugin extends SaslTransportPlugin {
public static final String DIGEST = "DIGEST-MD5";
private static final Logger LOG = LoggerFactory.getLogger(DigestSaslTransportPlugin.class);
+ private WorkerTokenAuthorizer workerTokenAuthorizer;
protected TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
+ if (workerTokenAuthorizer == null) {
+ workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
+ }
//create an authentication callback handler
CallbackHandler serverCallbackHandler = new SimpleSaslServerCallbackHandler(impersonationAllowed,
- new WorkerTokenAuthorizer(conf, type),
+ workerTokenAuthorizer,
new JassPasswordProvider(loginConf));
//create a transport factory that will invoke our auth callback for digest
@@ -93,4 +97,9 @@ public class DigestSaslTransportPlugin extends SaslTransportPlugin {
public boolean areWorkerTokensSupported() {
return true;
}
+
+ @Override
+ public void close() {
+ workerTokenAuthorizer.close();
+ }
}
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index 081037b..27ea878 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -50,9 +50,13 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
private static final String DISABLE_LOGIN_CACHE = "disableLoginCache";
private static Map<LoginCacheKey, Login> loginCache = new ConcurrentHashMap<>();
+ private WorkerTokenAuthorizer workerTokenAuthorizer;
@Override
public TTransportFactory getServerTransportFactory(boolean impersonationAllowed) throws IOException {
+ if (workerTokenAuthorizer == null) {
+ workerTokenAuthorizer = new WorkerTokenAuthorizer(conf, type);
+ }
//create an authentication callback handler
CallbackHandler server_callback_handler = new ServerCallbackHandler(loginConf, impersonationAllowed);
@@ -91,7 +95,7 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
//Also add in support for worker tokens
factory.addServerDefinition(DIGEST, ClientAuthUtils.SERVICE, hostName, null,
- new SimpleSaslServerCallbackHandler(impersonationAllowed, new WorkerTokenAuthorizer(conf, type)));
+ new SimpleSaslServerCallbackHandler(impersonationAllowed, workerTokenAuthorizer));
//create a wrap transport factory so that we could apply user credential during connections
TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
@@ -234,6 +238,11 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
return true;
}
+ @Override
+ public void close() {
+ workerTokenAuthorizer.close();
+ }
+
/**
* A TransportFactory that wraps another one, but assumes a specified UGI before calling through.
*
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
index d40ad6f..1694caa 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/sasl/SaslTransportPlugin.java
@@ -12,6 +12,7 @@
package org.apache.storm.security.auth.sasl;
+import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.security.Principal;
@@ -45,7 +46,7 @@ import org.apache.storm.utils.ExtendedThreadPoolExecutor;
/**
* Base class for SASL authentication plugin.
*/
-public abstract class SaslTransportPlugin implements ITransportPlugin {
+public abstract class SaslTransportPlugin implements ITransportPlugin, Closeable {
protected ThriftConnectionType type;
protected Map<String, Object> conf;
protected Configuration loginConf;
@@ -82,9 +83,9 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
if (serverTransportFactory != null) {
serverArgs.transportFactory(serverTransportFactory);
}
- BlockingQueue workQueue = new SynchronousQueue();
+ BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
if (queueSize != null) {
- workQueue = new ArrayBlockingQueue(queueSize);
+ workQueue = new ArrayBlockingQueue<>(queueSize);
}
ThreadPoolExecutor executorService = new ExtendedThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
60, TimeUnit.SECONDS, workQueue);
@@ -92,6 +93,10 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
return new TThreadPoolServer(serverArgs);
}
+ @Override
+ public void close() {
+ }
+
/**
* Create the transport factory needed for serving. All subclass must implement this method.
*
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
index 6c7dbb6..f321221 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
@@ -12,6 +12,8 @@
package org.apache.storm.security.auth.workertoken;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
@@ -39,9 +41,10 @@ import org.slf4j.LoggerFactory;
/**
* Allow for SASL authentication using worker tokens.
*/
-public class WorkerTokenAuthorizer implements PasswordProvider {
+public class WorkerTokenAuthorizer implements PasswordProvider, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(WorkerTokenAuthorizer.class);
private final LoadingCache<WorkerTokenInfo, PrivateWorkerKey> keyCache;
+ private final IStormClusterState state;
/**
* Constructor.
@@ -72,6 +75,7 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
});
}
keyCache = tmpKeyCache;
+ this.state = state;
}
private static IStormClusterState buildStateIfNeeded(Map<String, Object> conf, ThriftConnectionType connectionType) {
@@ -141,4 +145,11 @@ public class WorkerTokenAuthorizer implements PasswordProvider {
WorkerTokenInfo deser = Utils.deserialize(user, WorkerTokenInfo.class);
return deser.get_userName();
}
+
+ @Override
+ public void close() {
+ if (state != null) {
+ state.disconnect();
+ }
+ }
}
\ No newline at end of file
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index 477bf05..19f016f 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -46,14 +46,13 @@ import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.messaging.TransportFactory;
import org.apache.storm.utils.Utils;
import org.junit.Test;
-import org.mockito.internal.matchers.LessThan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyTest {
private static final Logger LOG = LoggerFactory.getLogger(NettyTest.class);
-
+
private final AtomicBoolean[] remoteBpStatus = new AtomicBoolean[]{new AtomicBoolean(), new AtomicBoolean()};
private final int taskId = 1;
@@ -114,9 +113,8 @@ public class NettyTest {
IContext context = TransportFactory.makeContext(stormConf);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
- try (IConnection server = context.bind(null, 0);
+ try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
- server.registerRecv(mkConnectionCallback(response::set));
waitUntilReady(client, server);
byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
@@ -176,9 +174,8 @@ public class NettyTest {
IContext context = TransportFactory.makeContext(stormConf);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
- try (IConnection server = context.bind(null, 0);
+ try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
- server.registerRecv(mkConnectionCallback(response::set));
waitUntilReady(client, server);
byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
@@ -231,9 +228,8 @@ public class NettyTest {
IContext context = TransportFactory.makeContext(stormConf);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
- try (IConnection server = context.bind(null, 0);
+ try (IConnection server = context.bind(null, 0, mkConnectionCallback(response::set), null);
IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
- server.registerRecv(mkConnectionCallback(response::set));
waitUntilReady(client, server);
byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
@@ -278,8 +274,7 @@ public class NettyTest {
CompletableFuture<?> serverStart = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
- server.set(context.bind(null, port));
- server.get().registerRecv(mkConnectionCallback(response::set));
+ server.set(context.bind(null, port, mkConnectionCallback(response::set), null));
waitUntilReady(client, server.get());
} catch (Exception e) {
throw Utils.wrapInRuntime(e);
@@ -322,12 +317,11 @@ public class NettyTest {
AtomicInteger received = new AtomicInteger();
IContext context = TransportFactory.makeContext(stormConf);
try {
- try (IConnection server = context.bind(null, 0);
- IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
- server.registerRecv(mkConnectionCallback((message) -> {
+ try (IConnection server = context.bind(null, 0, mkConnectionCallback((message) -> {
responses.add(message);
received.incrementAndGet();
- }));
+ }), null);
+ IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
waitUntilReady(client, server);
IntStream.range(1, numMessages)
@@ -375,8 +369,7 @@ public class NettyTest {
try (IConnection client = context.connect(null, "localhost", port, remoteBpStatus)) {
byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
send(client, taskId, messageBytes);
- try (IConnection server = context.bind(null, port)) {
- server.registerRecv(mkConnectionCallback(response::set));
+ try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null)) {
waitUntilReady(client, server);
send(client, taskId, messageBytes);
waitForNotNull(response);
@@ -406,9 +399,8 @@ public class NettyTest {
IContext context = TransportFactory.makeContext(stormConf);
try {
AtomicReference<TaskMessage> response = new AtomicReference<>();
- try (IConnection server = context.bind(null, port);
+ try (IConnection server = context.bind(null, port, mkConnectionCallback(response::set), null);
IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
- server.registerRecv(mkConnectionCallback(response::set));
waitUntilReady(client, server);
byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 222435f..eb7a746 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -134,7 +134,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
private final Nimbus nimbus;
//This is very private and does not need to be exposed
- private final AtomicInteger portCounter;
+ private int portCounter;
private final Map<String, Object> daemonConf;
private final List<Supervisor> supervisors;
private final IStateStorage state;
@@ -225,7 +225,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
this.daemonConf = new HashMap<>(conf);
this.metricRegistry = new StormMetricsRegistry();
- this.portCounter = new AtomicInteger(builder.supervisorSlotPortMin);
+ this.portCounter = builder.supervisorSlotPortMin;
ClusterStateContext cs = new ClusterStateContext(DaemonType.NIMBUS, daemonConf);
this.state = ClusterUtils.mkStateStorage(this.daemonConf, null, cs);
if (builder.clusterState == null) {
@@ -691,7 +691,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
List<Integer> portNumbers = new ArrayList<>(ports.intValue());
for (int i = 0; i < ports.intValue(); i++) {
- portNumbers.add(portCounter.getAndIncrement());
+ portNumbers.add(portCounter++);
}
Map<String, Object> superConf = new HashMap<>(daemonConf);
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index 77d9d01..b6bfd47 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -415,6 +415,7 @@ public class LocalFsBlobStore extends BlobStore {
if (timer != null) {
timer.cancel();;
}
+ stormClusterState.disconnect();
}
@Override
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 4607862..60d5e61 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -835,7 +835,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
* Set a new assignment asynchronously.
* @param newAssignment the new assignment for this slot to run, null to run nothing
*/
- public void setNewAssignment(LocalAssignment newAssignment) {
+ public final void setNewAssignment(LocalAssignment newAssignment) {
this.newAssignment.set(newAssignment == null ? null : new TimerDecoratedAssignment(newAssignment, staticState.slotMetrics.workerLaunchDuration));
}
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index f1bc79b..fc80b6b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -343,9 +343,7 @@ public class AsyncLocalizer implements AutoCloseable {
@Override
public void close() throws InterruptedException {
- if (execService != null) {
- execService.shutdown();
- }
+ execService.shutdown();
}
private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
diff --git a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
index 3f1fb04..2d00680 100644
--- a/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
+++ b/storm-server/src/test/java/org/apache/storm/security/auth/workertoken/WorkerTokenTest.java
@@ -81,10 +81,11 @@ public class WorkerTokenTest {
assertEquals(ONE_DAY_MILLIS, info.get_expirationTimeMillis());
assertEquals(versionNumber, info.get_secretVersion());
- //Verify the signature...
- WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState);
- byte[] signature = wta.getSignedPasswordFor(wt.get_info(), info);
- assertArrayEquals(wt.get_signature(), signature);
+ try (WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState)) {
+ //Verify the signature...
+ byte[] signature = wta.getSignedPasswordFor(wt.get_info(), info);
+ assertArrayEquals(wt.get_signature(), signature);
+ }
}
}
@@ -135,13 +136,14 @@ public class WorkerTokenTest {
//Expire the token
Time.advanceTime(ONE_DAY_MILLIS + 1);
- //Verify the signature...
- WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState);
- try {
- wta.getSignedPasswordFor(wt.get_info(), info);
- fail("Expected an expired token to not be signed!!!");
- } catch (IllegalArgumentException ia) {
- //What we want...
+ try (WorkerTokenAuthorizer wta = new WorkerTokenAuthorizer(type, mockState)) {
+ try {
+ //Verify the signature...
+ wta.getSignedPasswordFor(wt.get_info(), info);
+ fail("Expected an expired token to not be signed!!!");
+ } catch (IllegalArgumentException ia) {
+ //What we want...
+ }
}
//Verify if WorkerTokenManager recognizes the expired WorkerToken.