You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2018/05/20 21:26:47 UTC

[GitHub] storm pull request #2685: STORM-1294: Port netty-unit-test to Java

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2685

    STORM-1294: Port netty-unit-test to Java

    https://issues.apache.org/jira/browse/STORM-1294
    
    The batch-test was not being run, and wouldn't pass with the given configuration. It was disabled in this PR https://github.com/apache/storm/pull/847/files#diff-d183c4c873554507040b705bf17624b8. Since no one mentioned it, it looks like it might just have been a mistake. Updating the configuration to match the one used in the other tests makes the test pass, so I think we might as well include it.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-1294

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2685.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2685
    
----
commit d4ebac9c3b7d5ef58f8860780058f590a2d4e107
Author: Stig Rohde Døssing <sr...@...>
Date:   2018-05-20T19:50:18Z

    STORM-1294: Port netty-unit-test to Java

----


---

[GitHub] storm pull request #2685: STORM-1294: Port netty-unit-test to Java

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2685


---

[GitHub] storm pull request #2685: STORM-1294: Port netty-unit-test to Java

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2685#discussion_r192579594
  
    --- Diff: storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Consumer;
    +import java.util.stream.IntStream;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.Testing;
    +import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.grouping.Load;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IConnectionCallback;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.messaging.TransportFactory;
    +import org.apache.storm.utils.Utils;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class NettyTest {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(NettyTest.class);
    +
    +    private final AtomicBoolean[] remoteBpStatus = new AtomicBoolean[]{new AtomicBoolean(), new AtomicBoolean()};
    +    private final int taskId = 1;
    +
    +    /**
    +     * In a "real" cluster (or an integration test), Storm itself would ensure that a topology's workers would only be activated once all
    +     * the workers' connections are ready. The tests in this file however launch Netty servers and clients directly, and thus we must ensure
    +     * manually that the server and the client connections are ready before we commence testing. If we don't do this, then we will lose the
    +     * first messages being sent between the client and the server, which will fail the tests.
    +     */
    +    private void waitUntilReady(IConnection... connections) throws Exception {
    +        LOG.info("Waiting until all Netty connections are ready...");
    +        int intervalMs = 10;
    +        int maxWaitMs = 5000;
    +        int waitedMs = 0;
    +        while (true) {
    +            if (Arrays.asList(connections).stream()
    +                .allMatch(WorkerState::isConnectionReady)) {
    +                LOG.info("All Netty connections are ready");
    +                break;
    +            }
    +            if (waitedMs > maxWaitMs) {
    +                throw new RuntimeException("Netty connections were not ready within " + maxWaitMs + " ms");
    +            }
    +            Thread.sleep(intervalMs);
    +            waitedMs += intervalMs;
    +        }
    +    }
    +
    +    private IConnectionCallback mkConnectionCallback(Consumer<TaskMessage> myFn) {
    +        return (batch) -> {
    +            batch.forEach(myFn::accept);
    +        };
    +    }
    +
    +    private Runnable sleep() {
    +        return () -> {
    +            try {
    +                Thread.sleep(10);
    +            } catch (InterruptedException e) {
    +                throw Utils.wrapInRuntime(e);
    +            }
    +        };
    +    }
    +
    +    private void waitForNotNull(AtomicReference<TaskMessage> response) {
    +        Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
    +            () -> response.get() == null,
    +            sleep());
    +    }
    +
    +    private void doTestBasic(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("1. Should send and receive a basic message");
    +        String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    private Map<String, Object> basicConf() {
    +        Map<String, Object> stormConf = new HashMap<>();
    +        stormConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, false);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_RETRIES, 10);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS, 1);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS, 1);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK, 8388608);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK, 16777216);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT, 1);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT, 1000);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS, 1);
    +        stormConf.put(Config.TOPOLOGY_KRYO_FACTORY, "org.apache.storm.serialization.DefaultKryoFactory");
    +        stormConf.put(Config.TOPOLOGY_TUPLE_SERIALIZER, "org.apache.storm.serialization.types.ListDelegateSerializer");
    +        stormConf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false);
    +        stormConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, false);
    +        return stormConf;
    +    }
    +
    +    private Map<String, Object> withSaslConf(Map<String, Object> stormConf) {
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, true);
    +        stormConf.put(Config.TOPOLOGY_NAME, "topo1-netty-sasl");
    +        stormConf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, Utils.secureRandomLong() + ":" + Utils.secureRandomLong());
    +        return stormConf;
    +    }
    +
    +    @Test
    +    public void testBasic() throws Exception {
    +        doTestBasic(basicConf());
    +    }
    +
    +    @Test
    +    public void testBasicWithSasl() throws Exception {
    +        doTestBasic(withSaslConf(basicConf()));
    +    }
    +
    +    private void doTestLoad(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("2 test load");
    +        String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +                Map<Integer, Double> taskToLoad = new HashMap<>();
    +                taskToLoad.put(1, 0.0);
    +                taskToLoad.put(2, 1.0);
    +                server.sendLoadMetrics(taskToLoad);
    +
    +                List<Integer> tasks = new ArrayList<>();
    +                tasks.add(1);
    +                tasks.add(2);
    +                Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
    +                    () -> client.getLoad(tasks).isEmpty(),
    +                    sleep());
    +                Map<Integer, Load> load = client.getLoad(tasks);
    +                assertThat(load.get(1).getBoltLoad(), is(0.0));
    +                assertThat(load.get(2).getBoltLoad(), is(1.0));
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    @Test
    +    public void testLoad() throws Exception {
    +        doTestLoad(basicConf());
    +    }
    +
    +    @Test
    +    public void testLoadWithSasl() throws Exception {
    +        doTestLoad(withSaslConf(basicConf()));
    +    }
    +
    +    private void doTestLargeMessage(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("3 Should send and receive a large message");
    +        String reqMessage = StringUtils.repeat("c", 2048000);
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    private Map<String, Object> largeMessageConf() {
    +        Map<String, Object> conf = basicConf();
    +        conf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024000);
    +        return conf;
    +    }
    +    
    +    @Test
    +    public void testLargeMessage() throws Exception {
    +        doTestLargeMessage(largeMessageConf());
    +    }
    +
    +    @Test
    +    public void testLargeMessageWithSasl() throws Exception {
    +        doTestLargeMessage(withSaslConf(largeMessageConf()));
    +    }
    +
    +    private void doTestServerDelayed(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("4. test server delayed");
    +        String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            int port = Utils.getAvailablePort(6700);
    +            try (IConnection client = context.connect(null, "localhost", port, remoteBpStatus)) {
    +                AtomicReference<IConnection> server = new AtomicReference<>();
    +                try {
    +                    CompletableFuture<?> serverStart = CompletableFuture.runAsync(() -> {
    +                        try {
    +                            Thread.sleep(100);
    +                            server.set(context.bind(null, port));
    +                            server.get().registerRecv(mkConnectionCallback(response::set));
    +                            waitUntilReady(client, server.get());
    +                        } catch (Exception e) {
    +                            throw Utils.wrapInRuntime(e);
    +                        }
    +                    });
    +                    serverStart.get(Testing.TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    +                    byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                    client.send(taskId, messageBytes);
    +
    +                    waitForNotNull(response);
    +                    TaskMessage responseMessage = response.get();
    +                    assertThat(responseMessage.task(), is(taskId));
    +                    assertThat(responseMessage.message(), is(messageBytes));
    +                } finally {
    +                    if (server.get() != null) {
    +                        server.get().close();
    +                    }
    +                }
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    @Test
    +    public void testServerDelayed() throws Exception {
    +        doTestServerDelayed(basicConf());
    +    }
    +
    +    @Test
    +    public void testServerDelayedWithSasl() throws Exception {
    +        doTestServerDelayed(withSaslConf(basicConf()));
    +    }
    +
    +    private void doTestBatch(Map<String, Object> stormConf) throws Exception {
    +        int numMessages = 100000;
    +        LOG.info("Should send and receive many messages (testing with " + numMessages + " messages)");
    +        ArrayList<TaskMessage> responses = new ArrayList<>();
    +        AtomicInteger received = new AtomicInteger();
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback((message) -> {
    +                    responses.add(message);
    +                    received.incrementAndGet();
    +                }));
    +                waitUntilReady(client, server);
    +
    +                IntStream.range(1, numMessages)
    +                    .forEach(i -> client.send(taskId, String.valueOf(i).getBytes(StandardCharsets.UTF_8)));
    +
    +                Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
    +                    () -> responses.size() < numMessages - 1,
    +                    () -> {
    +                        LOG.info("{} of {} received", responses.size(), numMessages - 1);
    +                        sleep().run();
    +                    });
    +                IntStream.range(1, numMessages)
    +                    .forEach(i -> {
    +                        assertThat(new String(responses.get(i - 1).message(), StandardCharsets.UTF_8), is(String.valueOf(i)));
    +                    });
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    @Test
    +    public void testBatch() throws Exception {
    +        doTestBatch(basicConf());
    --- End diff --
    
    This conf should have `STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000` as `test-batch` defines, so you may want to define another method for new configuration.


---

[GitHub] storm pull request #2685: STORM-1294: Port netty-unit-test to Java

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2685#discussion_r192579513
  
    --- Diff: storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Consumer;
    +import java.util.stream.IntStream;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.Testing;
    +import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.grouping.Load;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IConnectionCallback;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.messaging.TransportFactory;
    +import org.apache.storm.utils.Utils;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class NettyTest {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(NettyTest.class);
    +
    +    private final AtomicBoolean[] remoteBpStatus = new AtomicBoolean[]{new AtomicBoolean(), new AtomicBoolean()};
    +    private final int taskId = 1;
    +
    +    /**
    +     * In a "real" cluster (or an integration test), Storm itself would ensure that a topology's workers would only be activated once all
    +     * the workers' connections are ready. The tests in this file however launch Netty servers and clients directly, and thus we must ensure
    +     * manually that the server and the client connections are ready before we commence testing. If we don't do this, then we will lose the
    +     * first messages being sent between the client and the server, which will fail the tests.
    +     */
    +    private void waitUntilReady(IConnection... connections) throws Exception {
    +        LOG.info("Waiting until all Netty connections are ready...");
    +        int intervalMs = 10;
    +        int maxWaitMs = 5000;
    +        int waitedMs = 0;
    +        while (true) {
    +            if (Arrays.asList(connections).stream()
    +                .allMatch(WorkerState::isConnectionReady)) {
    +                LOG.info("All Netty connections are ready");
    +                break;
    +            }
    +            if (waitedMs > maxWaitMs) {
    +                throw new RuntimeException("Netty connections were not ready within " + maxWaitMs + " ms");
    +            }
    +            Thread.sleep(intervalMs);
    +            waitedMs += intervalMs;
    +        }
    +    }
    +
    +    private IConnectionCallback mkConnectionCallback(Consumer<TaskMessage> myFn) {
    +        return (batch) -> {
    +            batch.forEach(myFn::accept);
    +        };
    +    }
    +
    +    private Runnable sleep() {
    +        return () -> {
    +            try {
    +                Thread.sleep(10);
    +            } catch (InterruptedException e) {
    +                throw Utils.wrapInRuntime(e);
    +            }
    +        };
    +    }
    +
    +    private void waitForNotNull(AtomicReference<TaskMessage> response) {
    +        Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
    +            () -> response.get() == null,
    +            sleep());
    +    }
    +
    +    private void doTestBasic(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("1. Should send and receive a basic message");
    +        String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    private Map<String, Object> basicConf() {
    +        Map<String, Object> stormConf = new HashMap<>();
    +        stormConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, false);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_RETRIES, 10);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS, 1);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS, 1);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK, 8388608);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK, 16777216);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT, 1);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT, 1000);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS, 1);
    +        stormConf.put(Config.TOPOLOGY_KRYO_FACTORY, "org.apache.storm.serialization.DefaultKryoFactory");
    +        stormConf.put(Config.TOPOLOGY_TUPLE_SERIALIZER, "org.apache.storm.serialization.types.ListDelegateSerializer");
    +        stormConf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false);
    +        stormConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, false);
    +        return stormConf;
    +    }
    +
    +    private Map<String, Object> withSaslConf(Map<String, Object> stormConf) {
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, true);
    +        stormConf.put(Config.TOPOLOGY_NAME, "topo1-netty-sasl");
    +        stormConf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, Utils.secureRandomLong() + ":" + Utils.secureRandomLong());
    +        return stormConf;
    +    }
    +
    +    @Test
    +    public void testBasic() throws Exception {
    +        doTestBasic(basicConf());
    +    }
    +
    +    @Test
    +    public void testBasicWithSasl() throws Exception {
    +        doTestBasic(withSaslConf(basicConf()));
    +    }
    +
    +    private void doTestLoad(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("2 test load");
    +        String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +                Map<Integer, Double> taskToLoad = new HashMap<>();
    +                taskToLoad.put(1, 0.0);
    +                taskToLoad.put(2, 1.0);
    +                server.sendLoadMetrics(taskToLoad);
    +
    +                List<Integer> tasks = new ArrayList<>();
    +                tasks.add(1);
    +                tasks.add(2);
    +                Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
    +                    () -> client.getLoad(tasks).isEmpty(),
    +                    sleep());
    +                Map<Integer, Load> load = client.getLoad(tasks);
    +                assertThat(load.get(1).getBoltLoad(), is(0.0));
    +                assertThat(load.get(2).getBoltLoad(), is(1.0));
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    @Test
    +    public void testLoad() throws Exception {
    +        doTestLoad(basicConf());
    +    }
    +
    +    @Test
    +    public void testLoadWithSasl() throws Exception {
    +        doTestLoad(withSaslConf(basicConf()));
    +    }
    +
    +    private void doTestLargeMessage(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("3 Should send and receive a large message");
    +        String reqMessage = StringUtils.repeat("c", 2048000);
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    private Map<String, Object> largeMessageConf() {
    +        Map<String, Object> conf = basicConf();
    +        conf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024000);
    --- End diff --
    
    Looks like this was 102400, as `test-large-msg` defines.


---

[GitHub] storm pull request #2685: STORM-1294: Port netty-unit-test to Java

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2685#discussion_r192583770
  
    --- Diff: storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java ---
    @@ -0,0 +1,382 @@
    +/*
    + * Copyright 2018 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.atomic.AtomicReference;
    +import java.util.function.Consumer;
    +import java.util.stream.IntStream;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.Testing;
    +import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.grouping.Load;
    +import org.apache.storm.messaging.IConnection;
    +import org.apache.storm.messaging.IConnectionCallback;
    +import org.apache.storm.messaging.IContext;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.messaging.TransportFactory;
    +import org.apache.storm.utils.Utils;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class NettyTest {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(NettyTest.class);
    +
    +    private final AtomicBoolean[] remoteBpStatus = new AtomicBoolean[]{new AtomicBoolean(), new AtomicBoolean()};
    +    private final int taskId = 1;
    +
    +    /**
    +     * In a "real" cluster (or an integration test), Storm itself would ensure that a topology's workers would only be activated once all
    +     * the workers' connections are ready. The tests in this file however launch Netty servers and clients directly, and thus we must ensure
    +     * manually that the server and the client connections are ready before we commence testing. If we don't do this, then we will lose the
    +     * first messages being sent between the client and the server, which will fail the tests.
    +     */
    +    private void waitUntilReady(IConnection... connections) throws Exception {
    +        LOG.info("Waiting until all Netty connections are ready...");
    +        int intervalMs = 10;
    +        int maxWaitMs = 5000;
    +        int waitedMs = 0;
    +        while (true) {
    +            if (Arrays.asList(connections).stream()
    +                .allMatch(WorkerState::isConnectionReady)) {
    +                LOG.info("All Netty connections are ready");
    +                break;
    +            }
    +            if (waitedMs > maxWaitMs) {
    +                throw new RuntimeException("Netty connections were not ready within " + maxWaitMs + " ms");
    +            }
    +            Thread.sleep(intervalMs);
    +            waitedMs += intervalMs;
    +        }
    +    }
    +
    +    private IConnectionCallback mkConnectionCallback(Consumer<TaskMessage> myFn) {
    +        return (batch) -> {
    +            batch.forEach(myFn::accept);
    +        };
    +    }
    +
    +    private Runnable sleep() {
    +        return () -> {
    +            try {
    +                Thread.sleep(10);
    +            } catch (InterruptedException e) {
    +                throw Utils.wrapInRuntime(e);
    +            }
    +        };
    +    }
    +
    +    private void waitForNotNull(AtomicReference<TaskMessage> response) {
    +        Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
    +            () -> response.get() == null,
    +            sleep());
    +    }
    +
    +    private void doTestBasic(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("1. Should send and receive a basic message");
    +        String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    private Map<String, Object> basicConf() {
    +        Map<String, Object> stormConf = new HashMap<>();
    +        stormConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, false);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_RETRIES, 10);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS, 1);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS, 1);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK, 8388608);
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK, 16777216);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL1_COUNT, 1);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL2_COUNT, 1000);
    +        stormConf.put(Config.TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS, 1);
    +        stormConf.put(Config.TOPOLOGY_KRYO_FACTORY, "org.apache.storm.serialization.DefaultKryoFactory");
    +        stormConf.put(Config.TOPOLOGY_TUPLE_SERIALIZER, "org.apache.storm.serialization.types.ListDelegateSerializer");
    +        stormConf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, false);
    +        stormConf.put(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS, false);
    +        return stormConf;
    +    }
    +
    +    private Map<String, Object> withSaslConf(Map<String, Object> stormConf) {
    +        stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, true);
    +        stormConf.put(Config.TOPOLOGY_NAME, "topo1-netty-sasl");
    +        stormConf.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, Utils.secureRandomLong() + ":" + Utils.secureRandomLong());
    +        return stormConf;
    +    }
    +
    +    @Test
    +    public void testBasic() throws Exception {
    +        doTestBasic(basicConf());
    +    }
    +
    +    @Test
    +    public void testBasicWithSasl() throws Exception {
    +        doTestBasic(withSaslConf(basicConf()));
    +    }
    +
    +    private void doTestLoad(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("2 test load");
    +        String reqMessage = "0123456789abcdefghijklmnopqrstuvwxyz";
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +                Map<Integer, Double> taskToLoad = new HashMap<>();
    +                taskToLoad.put(1, 0.0);
    +                taskToLoad.put(2, 1.0);
    +                server.sendLoadMetrics(taskToLoad);
    +
    +                List<Integer> tasks = new ArrayList<>();
    +                tasks.add(1);
    +                tasks.add(2);
    +                Testing.whileTimeout(Testing.TEST_TIMEOUT_MS,
    +                    () -> client.getLoad(tasks).isEmpty(),
    +                    sleep());
    +                Map<Integer, Load> load = client.getLoad(tasks);
    +                assertThat(load.get(1).getBoltLoad(), is(0.0));
    +                assertThat(load.get(2).getBoltLoad(), is(1.0));
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    @Test
    +    public void testLoad() throws Exception {
    +        doTestLoad(basicConf());
    +    }
    +
    +    @Test
    +    public void testLoadWithSasl() throws Exception {
    +        doTestLoad(withSaslConf(basicConf()));
    +    }
    +
    +    private void doTestLargeMessage(Map<String, Object> stormConf) throws Exception {
    +        LOG.info("3 Should send and receive a large message");
    +        String reqMessage = StringUtils.repeat("c", 2048000);
    +        IContext context = TransportFactory.makeContext(stormConf);
    +        try {
    +            AtomicReference<TaskMessage> response = new AtomicReference<>();
    +            try (IConnection server = context.bind(null, 0);
    +                IConnection client = context.connect(null, "localhost", server.getPort(), remoteBpStatus)) {
    +                server.registerRecv(mkConnectionCallback(response::set));
    +                waitUntilReady(client, server);
    +                byte[] messageBytes = reqMessage.getBytes(StandardCharsets.UTF_8);
    +
    +                client.send(taskId, messageBytes);
    +
    +                waitForNotNull(response);
    +                TaskMessage responseMessage = response.get();
    +                assertThat(responseMessage.task(), is(taskId));
    +                assertThat(responseMessage.message(), is(messageBytes));
    +            }
    +        } finally {
    +            context.term();
    +        }
    +    }
    +
    +    private Map<String, Object> largeMessageConf() {
    +        Map<String, Object> conf = basicConf();
    +        conf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024000);
    --- End diff --
    
    Nice catch


---