You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:09 UTC
[02/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
deleted file mode 100644
index d0a2f88..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestRegionUnavailable.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.feature.DefaultFeatureProvider;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test Case for {@link org.apache.distributedlog.exceptions.RegionUnavailableException}.
- */
-public class TestRegionUnavailable extends DistributedLogServerTestCase {
-
- /**
- * A feature provider for testing.
- */
- public static class TestFeatureProvider extends DefaultFeatureProvider {
-
- public TestFeatureProvider(String rootScope,
- DistributedLogConfiguration conf,
- StatsLogger statsLogger) {
- super(rootScope, conf, statsLogger);
- }
-
- @Override
- protected Feature makeFeature(String featureName) {
- if (featureName.contains(ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase())) {
- return new SettableFeature(featureName, 10000);
- }
- return super.makeFeature(featureName);
- }
-
- @Override
- protected FeatureProvider makeProvider(String fullScopeName) {
- return super.makeProvider(fullScopeName);
- }
- }
-
- private final int numServersPerDC = 3;
- private final List<DLServer> localCluster;
- private final List<DLServer> remoteCluster;
- private TwoRegionDLClient client;
-
- public TestRegionUnavailable() {
- super(true);
- this.localCluster = new ArrayList<DLServer>();
- this.remoteCluster = new ArrayList<DLServer>();
- }
-
- @Before
- @Override
- public void setup() throws Exception {
- DistributedLogConfiguration localConf = new DistributedLogConfiguration();
- localConf.addConfiguration(conf);
- localConf.setFeatureProviderClass(TestFeatureProvider.class);
- DistributedLogConfiguration remoteConf = new DistributedLogConfiguration();
- remoteConf.addConfiguration(conf);
- super.setup();
- int localPort = 9010;
- int remotePort = 9020;
- for (int i = 0; i < numServersPerDC; i++) {
- localCluster.add(createDistributedLogServer(localConf, localPort + i));
- remoteCluster.add(createDistributedLogServer(remoteConf, remotePort + i));
- }
- Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
- for (DLServer server : localCluster) {
- regionMap.put(server.getAddress(), "local");
- }
- for (DLServer server : remoteCluster) {
- regionMap.put(server.getAddress(), "remote");
- }
- client = createTwoRegionDLClient("two_regions_client", regionMap);
-
- }
-
- private void registerStream(String streamName) {
- for (DLServer server : localCluster) {
- client.localRoutingService.addHost(streamName, server.getAddress());
- }
- client.remoteRoutingService.addHost(streamName, remoteCluster.get(0).getAddress());
- }
-
- @After
- @Override
- public void teardown() throws Exception {
- super.teardown();
- if (null != client) {
- client.shutdown();
- }
- for (DLServer server : localCluster) {
- server.shutdown();
- }
- for (DLServer server : remoteCluster) {
- server.shutdown();
- }
- }
-
- @Test(timeout = 60000)
- public void testRegionUnavailable() throws Exception {
- String name = "dlserver-region-unavailable";
- registerStream(name);
-
- for (long i = 1; i <= 10; i++) {
- client.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
- }
-
- // check local region
- for (DLServer server : localCluster) {
- checkStreams(0, server);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
deleted file mode 100644
index c8b8bdf..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestStatsFilter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static org.junit.Assert.assertEquals;
-
-import com.twitter.finagle.Service;
-import com.twitter.finagle.service.ConstantService;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StatsFilter}.
- */
-public class TestStatsFilter {
-
- class RuntimeExService<Req, Rep> extends Service<Req, Rep> {
- public Future<Rep> apply(Req request) {
- throw new RuntimeException("test");
- }
- }
-
- @Test(timeout = 60000)
- public void testServiceSuccess() throws Exception {
- StatsLogger stats = new NullStatsLogger();
- StatsFilter<String, String> filter = new StatsFilter<String, String>(stats);
- Future<String> result = filter.apply("", new ConstantService<String, String>(Future.value("result")));
- assertEquals("result", Await.result(result));
- }
-
- @Test(timeout = 60000)
- public void testServiceFailure() throws Exception {
- StatsLogger stats = new NullStatsLogger();
- StatsFilter<String, String> filter = new StatsFilter<String, String>(stats);
- try {
- filter.apply("", new RuntimeExService<String, String>());
- } catch (RuntimeException ex) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
deleted file mode 100644
index 21bebb5..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestBalancerUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Test;
-
-/**
- * Test Case for {@link BalancerUtils}.
- */
-public class TestBalancerUtils {
-
- @Test(timeout = 60000)
- public void testCalculateNumStreamsToRebalance() {
- String myNode = "mynode";
-
- // empty load distribution
- assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
- myNode, new HashMap<String, Integer>(), 0, 10));
- // my node doesn't exist in load distribution
- Map<String, Integer> loadDistribution = new HashMap<String, Integer>();
- loadDistribution.put("node1", 10);
- assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
- myNode, loadDistribution, 0, 10));
- // my node doesn't reach rebalance water mark
- loadDistribution.clear();
- loadDistribution.put("node1", 1);
- loadDistribution.put(myNode, 100);
- assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
- myNode, loadDistribution, 200, 10));
- // my node is below average in the cluster.
- loadDistribution.clear();
- loadDistribution.put(myNode, 1);
- loadDistribution.put("node1", 99);
- assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
- myNode, loadDistribution, 0, 10));
- // my node is above average in the cluster
- assertEquals(49, BalancerUtils.calculateNumStreamsToRebalance(
- "node1", loadDistribution, 0, 10));
- // my node is at the tolerance range
- loadDistribution.clear();
- loadDistribution.put(myNode, 55);
- loadDistribution.put("node1", 45);
- assertEquals(0, BalancerUtils.calculateNumStreamsToRebalance(
- myNode, loadDistribution, 0, 10));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
deleted file mode 100644
index fb3fb6e..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestClusterBalancer.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.DLSocketAddress;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.DistributedLogServerTestCase;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for {@link ClusterBalancer}.
- */
-public class TestClusterBalancer extends DistributedLogServerTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class);
-
- private final int numServers = 5;
- private final List<DLServer> cluster;
- private DLClient client;
-
- public TestClusterBalancer() {
- super(true);
- this.cluster = new ArrayList<DLServer>();
- }
-
- @Before
- @Override
- public void setup() throws Exception {
- super.setup();
- int initPort = 9001;
- for (int i = 0; i < numServers; i++) {
- cluster.add(createDistributedLogServer(initPort + i));
- }
- client = createDistributedLogClient("cluster_client", Optional.<String>absent());
- }
-
- @After
- @Override
- public void teardown() throws Exception {
- super.teardown();
- if (null != client) {
- client.shutdown();
- }
- for (DLServer server: cluster) {
- server.shutdown();
- }
- }
-
- private void initStreams(String namePrefix) {
- logger.info("Init streams with prefix {}", namePrefix);
- // Stream Distribution: 5, 4, 3, 2, 1
- initStreams(namePrefix, 5, 1, 0);
- initStreams(namePrefix, 4, 6, 1);
- initStreams(namePrefix, 3, 10, 2);
- initStreams(namePrefix, 2, 13, 3);
- initStreams(namePrefix, 1, 15, 4);
- }
-
- private void initStreams(String namePrefix, int numStreams, int streamId, int proxyId) {
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + (streamId++);
- client.routingService.addHost(name, cluster.get(proxyId).getAddress());
- }
- }
-
- private void writeStreams(String namePrefix) throws Exception {
- logger.info("Write streams with prefix {}", namePrefix);
- writeStreams(namePrefix, 5, 1);
- writeStreams(namePrefix, 4, 6);
- writeStreams(namePrefix, 3, 10);
- writeStreams(namePrefix, 2, 13);
- writeStreams(namePrefix, 1, 15);
- }
-
- private void writeStreams(String namePrefix, int numStreams, int streamId) throws Exception {
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + (streamId++);
- try {
- Await.result(client.dlClient.write(name, ByteBuffer.wrap(name.getBytes(UTF_8))));
- } catch (Exception e) {
- logger.error("Error writing stream {} : ", name, e);
- throw e;
- }
- }
- }
-
- private void validateStreams(String namePrefix) throws Exception {
- logger.info("Validate streams with prefix {}", namePrefix);
- validateStreams(namePrefix, 5, 1, 0);
- validateStreams(namePrefix, 4, 6, 1);
- validateStreams(namePrefix, 3, 10, 2);
- validateStreams(namePrefix, 2, 13, 3);
- validateStreams(namePrefix, 1, 15, 4);
- }
-
- private void validateStreams(String namePrefix, int numStreams, int streamId, int proxyIdx) {
- Set<String> expectedStreams = new HashSet<String>();
- for (int i = 0; i < numStreams; i++) {
- expectedStreams.add(namePrefix + (streamId++));
- }
- checkStreams(expectedStreams, cluster.get(proxyIdx));
- }
-
- @Ignore
- @Test(timeout = 60000)
- public void testBalanceAll() throws Exception {
- String namePrefix = "clusterbalancer-balance-all-";
-
- initStreams(namePrefix);
- writeStreams(namePrefix);
- validateStreams(namePrefix);
-
- Optional<RateLimiter> rateLimiter = Optional.absent();
-
- Balancer balancer = new ClusterBalancer(client.dlClientBuilder,
- Pair.of((DistributedLogClient) client.dlClient, (MonitorServiceClient) client.dlClient));
- logger.info("Rebalancing from 'unknown' target");
- try {
- balancer.balanceAll("unknown", 10, rateLimiter);
- fail("Should fail on balanceAll from 'unknown' target.");
- } catch (IllegalArgumentException iae) {
- // expected
- }
- validateStreams(namePrefix);
-
- logger.info("Rebalancing from 'unexisted' host");
- String addr = DLSocketAddress.toString(DLSocketAddress.getSocketAddress(9999));
- balancer.balanceAll(addr, 10, rateLimiter);
- validateStreams(namePrefix);
-
- addr = DLSocketAddress.toString(cluster.get(0).getAddress());
- logger.info("Rebalancing from host {}.", addr);
- balancer.balanceAll(addr, 10, rateLimiter);
- checkStreams(0, cluster.get(0));
- checkStreams(4, cluster.get(1));
- checkStreams(3, cluster.get(2));
- checkStreams(4, cluster.get(3));
- checkStreams(4, cluster.get(4));
-
- addr = DLSocketAddress.toString(cluster.get(2).getAddress());
- logger.info("Rebalancing from host {}.", addr);
- balancer.balanceAll(addr, 10, rateLimiter);
- checkStreams(3, cluster.get(0));
- checkStreams(4, cluster.get(1));
- checkStreams(0, cluster.get(2));
- checkStreams(4, cluster.get(3));
- checkStreams(4, cluster.get(4));
-
- logger.info("Rebalancing the cluster");
- balancer.balance(0, 0.0f, 10, rateLimiter);
- for (int i = 0; i < 5; i++) {
- checkStreams(3, cluster.get(i));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
deleted file mode 100644
index 6734083..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestCountBasedStreamChooser.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Sets;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Test;
-
-/**
- * Test Case for {@link CountBasedStreamChooser}.
- */
-public class TestCountBasedStreamChooser {
-
- @Test(timeout = 60000)
- public void testEmptyStreamDistribution() {
- try {
- new CountBasedStreamChooser(new HashMap<SocketAddress, Set<String>>());
- fail("Should fail constructing stream chooser if the stream distribution is empty");
- } catch (IllegalArgumentException iae) {
- // expected
- }
- }
-
- @Test(timeout = 60000)
- public void testMultipleHostsWithEmptyStreams() {
- for (int i = 1; i <= 3; i++) {
- Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
- int port = 1000;
- for (int j = 0; j < i; j++) {
- SocketAddress address = new InetSocketAddress("127.0.0.1", port + j);
- streamDistribution.put(address, new HashSet<String>());
- }
-
- CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
- for (int k = 0; k < i + 1; k++) {
- assertNull(chooser.choose());
- }
- }
- }
-
- @Test(timeout = 60000)
- public void testSingleHostWithStreams() {
- for (int i = 0; i < 3; i++) {
- Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
-
- Set<String> streams = new HashSet<String>();
- for (int j = 0; j < 3; j++) {
- streams.add("SingleHostStream-" + j);
- }
-
- int port = 1000;
- SocketAddress address = new InetSocketAddress("127.0.0.1", port);
- streamDistribution.put(address, streams);
-
- for (int k = 1; k <= i; k++) {
- address = new InetSocketAddress("127.0.0.1", port + k);
- streamDistribution.put(address, new HashSet<String>());
- }
-
- Set<String> choosenStreams = new HashSet<String>();
-
- CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
- for (int l = 0; l < 3 + i + 1; l++) {
- String s = chooser.choose();
- if (null != s) {
- choosenStreams.add(s);
- }
- }
-
- assertEquals(streams.size(), choosenStreams.size());
- assertTrue(Sets.difference(streams, choosenStreams).immutableCopy().isEmpty());
- }
- }
-
- @Test(timeout = 60000)
- public void testHostsHaveSameNumberStreams() {
- Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
- Set<String> allStreams = new HashSet<String>();
-
- int numHosts = 3;
- int numStreamsPerHost = 3;
-
- int port = 1000;
- for (int i = 1; i <= numHosts; i++) {
- SocketAddress address = new InetSocketAddress("127.0.0.1", port + i);
- Set<String> streams = new HashSet<String>();
-
- for (int j = 1; j <= numStreamsPerHost; j++) {
- String streamName = "HostsHaveSameNumberStreams-" + i + "-" + j;
- streams.add(streamName);
- allStreams.add(streamName);
- }
-
- streamDistribution.put(address, streams);
- }
-
- Set<String> streamsChoosen = new HashSet<String>();
- CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
- for (int i = 1; i <= numStreamsPerHost; i++) {
- for (int j = 1; j <= numHosts; j++) {
- String s = chooser.choose();
- assertNotNull(s);
- streamsChoosen.add(s);
- }
- for (int j = 0; j < numHosts; j++) {
- assertEquals(numStreamsPerHost - i, chooser.streamsDistribution.get(j).getRight().size());
- }
- }
- assertNull(chooser.choose());
- assertEquals(numHosts * numStreamsPerHost, streamsChoosen.size());
- assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty());
- }
-
- @Test(timeout = 60000)
- public void testHostsHaveDifferentNumberStreams() {
- Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
- Set<String> allStreams = new HashSet<String>();
-
- int numHosts = 6;
- int maxStreamsPerHost = 4;
-
- int port = 1000;
- for (int i = 0; i < numHosts; i++) {
- int group = i / 2;
- int numStreamsThisGroup = maxStreamsPerHost - group;
-
- SocketAddress address = new InetSocketAddress("127.0.0.1", port + i);
- Set<String> streams = new HashSet<String>();
-
- for (int j = 1; j <= numStreamsThisGroup; j++) {
- String streamName = "HostsHaveDifferentNumberStreams-" + i + "-" + j;
- streams.add(streamName);
- allStreams.add(streamName);
- }
-
- streamDistribution.put(address, streams);
- }
-
- Set<String> streamsChoosen = new HashSet<String>();
- CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
-
- for (int i = 0; i < allStreams.size(); i++) {
- String s = chooser.choose();
- assertNotNull(s);
- streamsChoosen.add(s);
- }
- assertNull(chooser.choose());
- assertEquals(allStreams.size(), streamsChoosen.size());
- assertTrue(Sets.difference(allStreams, streamsChoosen).isEmpty());
- }
-
- @Test(timeout = 60000)
- public void testLimitedStreamChooser() {
- Map<SocketAddress, Set<String>> streamDistribution = new HashMap<SocketAddress, Set<String>>();
-
- Set<String> streams = new HashSet<String>();
- for (int j = 0; j < 10; j++) {
- streams.add("SingleHostStream-" + j);
- }
-
- int port = 1000;
- SocketAddress address = new InetSocketAddress("127.0.0.1", port);
- streamDistribution.put(address, streams);
-
- Set<String> choosenStreams = new HashSet<String>();
-
- CountBasedStreamChooser underlying = new CountBasedStreamChooser(streamDistribution);
- LimitedStreamChooser chooser = LimitedStreamChooser.of(underlying, 1);
- for (int l = 0; l < 10; l++) {
- String s = chooser.choose();
- if (null != s) {
- choosenStreams.add(s);
- }
- }
-
- assertEquals(1, choosenStreams.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
deleted file mode 100644
index 73fa98a..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestSimpleBalancer.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.DistributedLogServerTestCase;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import java.util.Set;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for {@link SimpleBalancer}.
- */
-public class TestSimpleBalancer extends DistributedLogServerTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class);
-
- DLClient targetClient;
- DLServer targetServer;
-
- public TestSimpleBalancer() {
- super(true);
- }
-
- @Before
- @Override
- public void setup() throws Exception {
- super.setup();
- targetServer = createDistributedLogServer(7003);
- targetClient = createDistributedLogClient("target", Optional.<String>absent());
- }
-
- @After
- @Override
- public void teardown() throws Exception {
- super.teardown();
- if (null != targetClient) {
- targetClient.shutdown();
- }
- if (null != targetServer) {
- targetServer.shutdown();
- }
- }
-
- @Test(timeout = 60000)
- public void testBalanceAll() throws Exception {
- String namePrefix = "simplebalancer-balance-all-";
- int numStreams = 10;
-
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- // src client
- dlClient.routingService.addHost(name, dlServer.getAddress());
- // target client
- targetClient.routingService.addHost(name, targetServer.getAddress());
- }
-
- // write to multiple streams
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
- }
-
- // validation
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true);
- checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
- }
-
- Optional<RateLimiter> rateLimiter = Optional.absent();
-
- Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient,
- "target", targetClient.dlClient, targetClient.dlClient);
- logger.info("Rebalancing from 'unknown' target");
- try {
- balancer.balanceAll("unknown", 10, rateLimiter);
- fail("Should fail on balanceAll from 'unknown' target.");
- } catch (IllegalArgumentException iae) {
- // expected
- }
-
- // nothing to balance from 'target'
- logger.info("Rebalancing from 'target' target");
- balancer.balanceAll("target", 1, rateLimiter);
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true);
- checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
- }
-
- // balance all streams from 'source'
- logger.info("Rebalancing from 'source' target");
- balancer.balanceAll("source", 10, rateLimiter);
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- checkStream(name, targetClient, targetServer, 1, numStreams, numStreams, true, true);
- checkStream(name, dlClient, dlServer, 0, 0, 0, false, false);
- }
- }
-
- @Test(timeout = 60000)
- public void testBalanceStreams() throws Exception {
- String namePrefix = "simplebalancer-balance-streams-";
- int numStreams = 10;
-
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- // src client
- dlClient.routingService.addHost(name, dlServer.getAddress());
- // target client
- targetClient.routingService.addHost(name, targetServer.getAddress());
- }
-
- // write to multiple streams
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
- }
-
- // validation
- for (int i = 0; i < numStreams; i++) {
- String name = namePrefix + i;
- checkStream(name, dlClient, dlServer, 1, numStreams, numStreams, true, true);
- checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
- }
-
- Optional<RateLimiter> rateLimiter = Optional.absent();
-
- Balancer balancer = new SimpleBalancer("source", dlClient.dlClient, dlClient.dlClient,
- "target", targetClient.dlClient, targetClient.dlClient);
-
- // balance all streams from 'source'
- logger.info("Rebalancing streams between targets");
- balancer.balance(0, 0, 10, rateLimiter);
-
- Set<String> sourceStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(dlClient));
- Set<String> targetStreams = getAllStreamsFromDistribution(getStreamOwnershipDistribution(targetClient));
-
- assertEquals(numStreams / 2, sourceStreams.size());
- assertEquals(numStreams / 2, targetStreams.size());
-
- for (String name : sourceStreams) {
- checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, true, true);
- checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, false, false);
- }
-
- for (String name : targetStreams) {
- checkStream(name, targetClient, targetServer, 1, numStreams / 2, numStreams / 2, true, true);
- checkStream(name, dlClient, dlServer, 1, numStreams / 2, numStreams / 2, false, false);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
deleted file mode 100644
index ce7b2c1..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/balancer/TestStreamMover.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.service.DistributedLogClient;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.DistributedLogServerTestCase;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StreamMover}.
- */
-public class TestStreamMover extends DistributedLogServerTestCase {
-
- DLClient targetClient;
- DLServer targetServer;
-
- public TestStreamMover() {
- super(true);
- }
-
- @Before
- @Override
- public void setup() throws Exception {
- super.setup();
- targetServer = createDistributedLogServer(7003);
- targetClient = createDistributedLogClient("target", Optional.<String>absent());
- }
-
- @After
- @Override
- public void teardown() throws Exception {
- super.teardown();
- if (null != targetClient) {
- targetClient.shutdown();
- }
- if (null != targetServer) {
- targetServer.shutdown();
- }
- }
-
- @Test(timeout = 60000)
- public void testMoveStream() throws Exception {
- String name = "dlserver-move-stream";
-
- // src client
- dlClient.routingService.addHost(name, dlServer.getAddress());
- // target client
- targetClient.routingService.addHost(name, targetServer.getAddress());
-
- // src client write a record to that stream
- Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
- checkStream(name, dlClient, dlServer, 1, 1, 1, true, true);
- checkStream(name, targetClient, targetServer, 0, 0, 0, false, false);
-
- StreamMover streamMover = new StreamMoverImpl("source", dlClient.dlClient, dlClient.dlClient,
- "target", targetClient.dlClient, targetClient.dlClient);
- assertTrue(streamMover.moveStream(name));
- checkStream(name, dlClient, dlServer, 0, 0, 0, false, false);
- checkStream(name, targetClient, targetServer, 1, 1, 1, true, true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
deleted file mode 100644
index 71dfa45..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestServerConfiguration.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-/**
- * Test Case for {@link ServerConfiguration}.
- */
-public class TestServerConfiguration {
-
- @Test(timeout = 60000, expected = IllegalArgumentException.class)
- public void testUnassignedShardId() {
- new ServerConfiguration().validate();
- }
-
- @Test(timeout = 60000)
- public void testAssignedShardId() {
- ServerConfiguration conf = new ServerConfiguration();
- conf.setServerShardId(100);
- conf.validate();
- assertEquals(100, conf.getServerShardId());
- }
-
- @Test(timeout = 60000, expected = IllegalArgumentException.class)
- public void testInvalidServerThreads() {
- ServerConfiguration conf = new ServerConfiguration();
- conf.setServerShardId(100);
- conf.setServerThreads(-1);
- conf.validate();
- }
-
- @Test(timeout = 60000, expected = IllegalArgumentException.class)
- public void testInvalidDlsnVersion() {
- ServerConfiguration conf = new ServerConfiguration();
- conf.setServerShardId(100);
- conf.setDlsnVersion((byte) 9999);
- conf.validate();
- }
-
- @Test(timeout = 60000)
- public void testUseHostnameAsAllocatorPoolName() {
- ServerConfiguration conf = new ServerConfiguration();
- assertFalse("Should not use hostname by default", conf.isUseHostnameAsAllocatorPoolName());
- conf.setUseHostnameAsAllocatorPoolName(true);
- assertTrue("Should use hostname now", conf.isUseHostnameAsAllocatorPoolName());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
deleted file mode 100644
index bdbde11..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/config/TestStreamConfigProvider.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import static org.apache.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.config.PropertiesWriter;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StreamConfigProvider}.
- */
-public class TestStreamConfigProvider {
- private static final String DEFAULT_CONFIG_DIR = "conf";
- private final String defaultConfigPath;
- private final ScheduledExecutorService configExecutorService;
-
- public TestStreamConfigProvider() throws Exception {
- this.configExecutorService = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat("DistributedLogService-Dyncfg-%d").build());
- PropertiesWriter writer = new PropertiesWriter();
- writer.save();
- this.defaultConfigPath = writer.getFile().getPath();
- }
-
- StreamConfigProvider getServiceProvider(StreamPartitionConverter converter)
- throws Exception {
- return getServiceProvider(converter, DEFAULT_CONFIG_DIR);
- }
-
- StreamConfigProvider getServiceProvider(
- StreamPartitionConverter converter,
- String configPath,
- String defaultPath) throws Exception {
- return new ServiceStreamConfigProvider(
- configPath,
- defaultPath,
- converter,
- configExecutorService,
- 1,
- TimeUnit.SECONDS);
- }
-
- StreamConfigProvider getServiceProvider(
- StreamPartitionConverter converter,
- String configPath) throws Exception {
- return getServiceProvider(converter, configPath, defaultConfigPath);
- }
-
- StreamConfigProvider getDefaultProvider(String configFile) throws Exception {
- return new DefaultStreamConfigProvider(configFile, configExecutorService, 1, TimeUnit.SECONDS);
- }
-
- StreamConfigProvider getNullProvider() throws Exception {
- return new NullStreamConfigProvider();
- }
-
- @Test(timeout = 60000)
- public void testServiceProviderWithConfigRouters() throws Exception {
- getServiceProvider(new IdentityStreamPartitionConverter());
- }
-
- @Test(timeout = 60000)
- public void testServiceProviderWithMissingConfig() throws Exception {
- StreamConfigProvider provider = getServiceProvider(new IdentityStreamPartitionConverter());
- Optional<DynamicDistributedLogConfiguration> config = provider.getDynamicStreamConfig("stream1");
- assertTrue(config.isPresent());
- }
-
- @Test(timeout = 60000)
- public void testServiceProviderWithDefaultConfigPath() throws Exception {
- // Default config with property set.
- PropertiesWriter writer1 = new PropertiesWriter();
- writer1.setProperty("rpsStreamAcquireServiceLimit", "191919");
- writer1.save();
- String fallbackConfPath1 = writer1.getFile().getPath();
- StreamConfigProvider provider1 = getServiceProvider(new IdentityStreamPartitionConverter(),
- DEFAULT_CONFIG_DIR, fallbackConfPath1);
- Optional<DynamicDistributedLogConfiguration> config1 = provider1.getDynamicStreamConfig("stream1");
-
- // Empty default config.
- PropertiesWriter writer2 = new PropertiesWriter();
- writer2.save();
- String fallbackConfPath2 = writer2.getFile().getPath();
- StreamConfigProvider provider2 = getServiceProvider(new IdentityStreamPartitionConverter(),
- DEFAULT_CONFIG_DIR, fallbackConfPath2);
- Optional<DynamicDistributedLogConfiguration> config2 = provider2.getDynamicStreamConfig("stream1");
-
- assertEquals(191919, config1.get().getRpsStreamAcquireServiceLimit());
- assertEquals(-1, config2.get().getRpsStreamAcquireServiceLimit());
- }
-
- @Test(timeout = 60000)
- public void testDefaultProvider() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- writer.setProperty(BKDL_RETENTION_PERIOD_IN_HOURS, "99");
- writer.save();
- StreamConfigProvider provider = getDefaultProvider(writer.getFile().getPath());
- Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1");
- Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2");
- assertTrue(config1.isPresent());
- assertTrue(config1.get() == config2.get());
- assertEquals(99, config1.get().getRetentionPeriodHours());
- }
-
- @Test(timeout = 60000)
- public void testNullProvider() throws Exception {
- StreamConfigProvider provider = getNullProvider();
- Optional<DynamicDistributedLogConfiguration> config1 = provider.getDynamicStreamConfig("stream1");
- Optional<DynamicDistributedLogConfiguration> config2 = provider.getDynamicStreamConfig("stream2");
- assertFalse(config1.isPresent());
- assertTrue(config1 == config2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
deleted file mode 100644
index 5f5ecd4..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.LinkedHashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test Case for {@link LeastLoadPlacementPolicy}.
- */
-public class TestLeastLoadPlacementPolicy {
-
- @Test(timeout = 10000)
- public void testCalculateBalances() throws Exception {
- int numSevers = new Random().nextInt(20) + 1;
- int numStreams = new Random().nextInt(200) + 1;
- RoutingService mockRoutingService = mock(RoutingService.class);
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
- LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
- new EqualLoadAppraiser(),
- mockRoutingService,
- mockNamespace,
- null,
- Duration.fromSeconds(600),
- new NullStatsLogger());
- TreeSet<ServerLoad> serverLoads =
- Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
- long lowLoadPerServer = numStreams / numSevers;
- long highLoadPerServer = lowLoadPerServer + 1;
- for (ServerLoad serverLoad : serverLoads) {
- long load = serverLoad.getLoad();
- assertEquals(load, serverLoad.getStreamLoads().size());
- assertTrue(String.format("Load %d is not between %d and %d",
- load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
- }
- }
-
- @Test(timeout = 10000)
- public void testRefreshAndPlaceStream() throws Exception {
- int numSevers = new Random().nextInt(20) + 1;
- int numStreams = new Random().nextInt(200) + 1;
- RoutingService mockRoutingService = mock(RoutingService.class);
- when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
- try {
- when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
- } catch (IOException e) {
- fail();
- }
- PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
- LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
- new EqualLoadAppraiser(),
- mockRoutingService,
- mockNamespace,
- mockPlacementStateManager,
- Duration.fromSeconds(600),
- new NullStatsLogger());
- leastLoadPlacementPolicy.refresh();
-
- final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
- verify(mockPlacementStateManager).saveOwnership(captor.capture());
- TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>) captor.getValue();
- ServerLoad next = serverLoads.first();
- String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
- assertEquals(next.getServer(), serverPlacement);
- }
-
- @Test(timeout = 10000)
- public void testCalculateUnequalWeight() throws Exception {
- int numSevers = new Random().nextInt(20) + 1;
- int numStreams = new Random().nextInt(200) + 1;
- /* use AtomicInteger to have a final object in answer method */
- final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
- RoutingService mockRoutingService = mock(RoutingService.class);
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
- LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
- when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
- @Override
- public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable {
- int load = new Random().nextInt(100000);
- if (load > maxLoad.get()) {
- maxLoad.set(load);
- }
- return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
- }
- });
- LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
- mockLoadAppraiser,
- mockRoutingService,
- mockNamespace,
- null,
- Duration.fromSeconds(600),
- new NullStatsLogger());
- TreeSet<ServerLoad> serverLoads =
- Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
- long highestLoadSeen = Long.MIN_VALUE;
- long lowestLoadSeen = Long.MAX_VALUE;
- for (ServerLoad serverLoad : serverLoads) {
- long load = serverLoad.getLoad();
- if (load < lowestLoadSeen) {
- lowestLoadSeen = load;
- }
- if (load > highestLoadSeen) {
- highestLoadSeen = load;
- }
- }
- assertTrue("Unexpected placement for " + numStreams + " streams to "
- + numSevers + " servers : highest load = " + highestLoadSeen
- + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(),
- highestLoadSeen - lowestLoadSeen <= maxLoad.get());
- }
-
- private Set<SocketAddress> generateSocketAddresses(int num) {
- LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
- for (int i = 0; i < num; i++) {
- socketAddresses.add(new InetSocketAddress(i));
- }
- return socketAddresses;
- }
-
- private Set<String> generateStreams(int num) {
- LinkedHashSet<String> streams = new LinkedHashSet<String>();
- for (int i = 0; i < num; i++) {
- streams.add("stream_" + i);
- }
- return streams;
- }
-
- private Set<String> generateServers(int num) {
- LinkedHashSet<String> servers = new LinkedHashSet<String>();
- for (int i = 0; i < num; i++) {
- servers.add("server_" + i);
- }
- return servers;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
deleted file mode 100644
index 5bd234f..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestServerLoad.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import org.junit.Test;
-
-/**
- * Test Case for {@link ServerLoad}.
- */
-public class TestServerLoad {
-
- @Test(timeout = 60000)
- public void testSerializeDeserialize() throws IOException {
- final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
- for (int i = 0; i < 20; i++) {
- serverLoad.addStream(new StreamLoad("stream-" + i, i));
- }
- assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
- }
-
- @Test(timeout = 60000)
- public void testGetLoad() throws IOException {
- final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
- assertEquals(0, serverLoad.getLoad());
- serverLoad.addStream(new StreamLoad("stream-" + 1, 3));
- assertEquals(3, serverLoad.getLoad());
- serverLoad.addStream(new StreamLoad("stream-" + 2, 7));
- assertEquals(10, serverLoad.getLoad());
- serverLoad.addStream(new StreamLoad("stream-" + 3, 1));
- assertEquals(11, serverLoad.getLoad());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
deleted file mode 100644
index 36a6fed..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestStreamLoad.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import org.junit.Test;
-
-/**
- * Test Case for {@link StreamLoad}.
- */
-public class TestStreamLoad {
-
- @Test(timeout = 10000)
- public void testSerializeDeserialize() throws IOException {
- final String streamName = "aHellaRandomStreamName";
- final int load = 1337;
- final StreamLoad streamLoad = new StreamLoad(streamName, load);
- assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
deleted file mode 100644
index 07ec5a5..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/placement/TestZKPlacementStateManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import java.io.IOException;
-import java.net.URI;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.curator.test.TestingServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test Case for {@link ZKPlacementStateManager}.
- */
-public class TestZKPlacementStateManager {
- private TestingServer zkTestServer;
- private String zkServers;
- private URI uri;
- private ZKPlacementStateManager zkPlacementStateManager;
-
- @Before
- public void startZookeeper() throws Exception {
- zkTestServer = new TestingServer(2181);
- zkServers = "127.0.0.1:2181";
- uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
- zkPlacementStateManager =
- new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
- }
-
- @Test(timeout = 60000)
- public void testSaveLoad() throws Exception {
- TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
- zkPlacementStateManager.saveOwnership(ownerships);
- SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
-
- ownerships.add(new ServerLoad("emptyServer"));
- zkPlacementStateManager.saveOwnership(ownerships);
- loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
-
- ServerLoad sl1 = new ServerLoad("server1");
- sl1.addStream(new StreamLoad("stream1", 3));
- sl1.addStream(new StreamLoad("stream2", 4));
- ServerLoad sl2 = new ServerLoad("server2");
- sl2.addStream(new StreamLoad("stream3", 1));
- ownerships.add(sl1);
- ownerships.add(sl2);
- zkPlacementStateManager.saveOwnership(ownerships);
- loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
-
- loadedOwnerships.remove(sl1);
- zkPlacementStateManager.saveOwnership(ownerships);
- loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
- }
-
- private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
- LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
- int expectedNumServerLoads) throws InterruptedException {
- TreeSet<ServerLoad> notification = notificationQueue.take();
- assertNotNull(notification);
- while (notification.size() < expectedNumServerLoads) {
- notification = notificationQueue.take();
- }
- assertEquals(expectedNumServerLoads, notification.size());
- return notification;
- }
-
- @Test(timeout = 60000)
- public void testWatchIndefinitely() throws Exception {
- TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
- ownerships.add(new ServerLoad("server1"));
- final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
- new LinkedBlockingQueue<TreeSet<ServerLoad>>();
- PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
- @Override
- public void callback(TreeSet<ServerLoad> serverLoads) {
- serverLoadNotifications.add(serverLoads);
- }
- };
- zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
- zkPlacementStateManager.watch(callback);
- // cannot verify the callback here as it may call before the verify is called
-
- zkPlacementStateManager.saveOwnership(ownerships);
- assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
-
- ServerLoad server2 = new ServerLoad("server2");
- server2.addStream(new StreamLoad("hella-important-stream", 415));
- ownerships.add(server2);
- zkPlacementStateManager.saveOwnership(ownerships);
- assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
- }
-
- @Test(timeout = 60000)
- public void testZkFormatting() throws Exception {
- final String server = "host/10.0.0.0:31351";
- final String zkFormattedServer = "host--10.0.0.0:31351";
- URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
- ZKPlacementStateManager zkPlacementStateManager =
- new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
- assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
- assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
- }
-
- @After
- public void stopZookeeper() throws IOException {
- zkTestServer.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
deleted file mode 100644
index 56e9483..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.config.StreamConfigProvider;
-import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import com.twitter.util.Await;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Case for StreamManager.
- */
-public class TestStreamManager {
-
- @Rule
- public TestName testName = new TestName();
-
- ScheduledExecutorService mockExecutorService = mock(ScheduledExecutorService.class);
-
- @Test(timeout = 60000)
- public void testCollectionMethods() throws Exception {
- Stream mockStream = mock(Stream.class);
- when(mockStream.getStreamName()).thenReturn("stream1");
- when(mockStream.getPartition()).thenReturn(new Partition("stream1", 0));
- StreamFactory mockStreamFactory = mock(StreamFactory.class);
- StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
- StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
- when(mockStreamFactory.create(
- (String) any(),
- (DynamicDistributedLogConfiguration) any(),
- (StreamManager) any())).thenReturn(mockStream);
- StreamManager streamManager = new StreamManagerImpl(
- "",
- new DistributedLogConfiguration(),
- mockExecutorService,
- mockStreamFactory,
- mockPartitionConverter,
- mockStreamConfigProvider,
- mock(DistributedLogNamespace.class));
-
- assertFalse(streamManager.isAcquired("stream1"));
- assertEquals(0, streamManager.numAcquired());
- assertEquals(0, streamManager.numCached());
-
- streamManager.notifyAcquired(mockStream);
- assertTrue(streamManager.isAcquired("stream1"));
- assertEquals(1, streamManager.numAcquired());
- assertEquals(0, streamManager.numCached());
-
- streamManager.notifyReleased(mockStream);
- assertFalse(streamManager.isAcquired("stream1"));
- assertEquals(0, streamManager.numAcquired());
- assertEquals(0, streamManager.numCached());
-
- streamManager.notifyAcquired(mockStream);
- assertTrue(streamManager.isAcquired("stream1"));
- assertEquals(1, streamManager.numAcquired());
- assertEquals(0, streamManager.numCached());
-
- streamManager.notifyAcquired(mockStream);
- assertTrue(streamManager.isAcquired("stream1"));
- assertEquals(1, streamManager.numAcquired());
- assertEquals(0, streamManager.numCached());
-
- streamManager.notifyReleased(mockStream);
- assertFalse(streamManager.isAcquired("stream1"));
- assertEquals(0, streamManager.numAcquired());
- assertEquals(0, streamManager.numCached());
-
- streamManager.notifyReleased(mockStream);
- assertFalse(streamManager.isAcquired("stream1"));
- assertEquals(0, streamManager.numAcquired());
- assertEquals(0, streamManager.numCached());
- }
-
- @Test(timeout = 60000)
- public void testCreateStream() throws Exception {
- Stream mockStream = mock(Stream.class);
- final String streamName = "stream1";
- when(mockStream.getStreamName()).thenReturn(streamName);
- StreamFactory mockStreamFactory = mock(StreamFactory.class);
- StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
- StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
- when(mockStreamFactory.create(
- (String) any(),
- (DynamicDistributedLogConfiguration) any(),
- (StreamManager) any())
- ).thenReturn(mockStream);
- DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class);
- ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
-
- StreamManager streamManager = new StreamManagerImpl(
- "",
- new DistributedLogConfiguration(),
- executorService,
- mockStreamFactory,
- mockPartitionConverter,
- mockStreamConfigProvider,
- dlNamespace);
-
- assertTrue(Await.ready(streamManager.createStreamAsync(streamName)).isReturn());
- verify(dlNamespace).createLog(streamName);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
deleted file mode 100644
index a18fda1..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.stream;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.distributedlog.AsyncLogWriter;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.acl.DefaultAccessControlManager;
-import org.apache.distributedlog.exceptions.InternalServerException;
-import org.apache.distributedlog.service.ResponseUtils;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Case for StreamOps.
- */
-public class TestStreamOp {
-
- @Rule
- public TestName testName = new TestName();
-
- private WriteOp getWriteOp() {
- SettableFeature disabledFeature = new SettableFeature("", 0);
- return new WriteOp("test",
- ByteBuffer.wrap("test".getBytes()),
- new NullStatsLogger(),
- new NullStatsLogger(),
- new IdentityStreamPartitionConverter(),
- new ServerConfiguration(),
- (byte) 0,
- null,
- false,
- disabledFeature,
- DefaultAccessControlManager.INSTANCE);
- }
-
- @Test(timeout = 60000)
- public void testResponseFailedTwice() throws Exception {
- WriteOp writeOp = getWriteOp();
- writeOp.fail(new InternalServerException("test1"));
- writeOp.fail(new InternalServerException("test2"));
-
- WriteResponse response = Await.result(writeOp.result());
- assertEquals(StatusCode.INTERNAL_SERVER_ERROR, response.getHeader().getCode());
- assertEquals(ResponseUtils.exceptionToHeader(new InternalServerException("test1")), response.getHeader());
- }
-
- @Test(timeout = 60000)
- public void testResponseSucceededThenFailed() throws Exception {
- AsyncLogWriter writer = mock(AsyncLogWriter.class);
- when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3)));
- when(writer.getStreamName()).thenReturn("test");
- WriteOp writeOp = getWriteOp();
- writeOp.execute(writer, new Sequencer() {
- public long nextId() {
- return 0;
- }
- }, new Object());
- writeOp.fail(new InternalServerException("test2"));
-
- WriteResponse response = Await.result(writeOp.result());
- assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
- }
-}