You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:10 UTC
[03/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
deleted file mode 100644
index 58b5b2a..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Sets;
-import org.apache.distributedlog.DLMTestUtil;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.routing.LocalRoutingService;
-import org.apache.distributedlog.client.routing.RegionsRoutingService;
-import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
-import org.apache.distributedlog.service.stream.StreamManager;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Duration;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-/**
- * Base test case for distributedlog servers.
- */
-public abstract class DistributedLogServerTestCase {
-
- protected static DistributedLogConfiguration conf =
- new DistributedLogConfiguration().setLockTimeout(10)
- .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
- protected static DistributedLogConfiguration noAdHocConf =
- new DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false)
- .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
- protected static DistributedLogCluster dlCluster;
- protected static DistributedLogCluster noAdHocCluster;
-
- /**
- * A distributedlog client wrapper for testing.
- */
- protected static class DLClient {
- public final LocalRoutingService routingService;
- public DistributedLogClientBuilder dlClientBuilder;
- public final DistributedLogClientImpl dlClient;
-
- protected DLClient(String name,
- String streamNameRegex,
- Optional<String> serverSideRoutingFinagleName) {
- routingService = LocalRoutingService.newBuilder().build();
- dlClientBuilder = DistributedLogClientBuilder.newBuilder()
- .name(name)
- .clientId(ClientId$.MODULE$.apply(name))
- .routingService(routingService)
- .streamNameRegex(streamNameRegex)
- .handshakeWithClientInfo(true)
- .clientBuilder(ClientBuilder.get()
- .hostConnectionLimit(1)
- .connectionTimeout(Duration.fromSeconds(1))
- .requestTimeout(Duration.fromSeconds(60)));
- if (serverSideRoutingFinagleName.isPresent()) {
- dlClientBuilder =
- dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
- }
- dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
- }
-
- public void handshake() {
- dlClient.handshake();
- }
-
- public void shutdown() {
- dlClient.close();
- }
- }
-
- /**
- * A distributedlog client wrapper that talks to two regions.
- */
- protected static class TwoRegionDLClient {
-
- public final LocalRoutingService localRoutingService;
- public final LocalRoutingService remoteRoutingService;
- public final DistributedLogClientBuilder dlClientBuilder;
- public final DistributedLogClientImpl dlClient;
-
- protected TwoRegionDLClient(String name, Map<SocketAddress, String> regionMap) {
- localRoutingService = new LocalRoutingService();
- remoteRoutingService = new LocalRoutingService();
- RegionsRoutingService regionsRoutingService =
- RegionsRoutingService.of(new DefaultRegionResolver(regionMap),
- localRoutingService, remoteRoutingService);
- dlClientBuilder = DistributedLogClientBuilder.newBuilder()
- .name(name)
- .clientId(ClientId$.MODULE$.apply(name))
- .routingService(regionsRoutingService)
- .streamNameRegex(".*")
- .handshakeWithClientInfo(true)
- .maxRedirects(2)
- .clientBuilder(ClientBuilder.get()
- .hostConnectionLimit(1)
- .connectionTimeout(Duration.fromSeconds(1))
- .requestTimeout(Duration.fromSeconds(10)));
- dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
- }
-
- public void shutdown() {
- dlClient.close();
- }
- }
-
- private final boolean clientSideRouting;
- protected DLServer dlServer;
- protected DLClient dlClient;
- protected DLServer noAdHocServer;
- protected DLClient noAdHocClient;
-
- public static DistributedLogCluster createCluster(DistributedLogConfiguration conf) throws Exception {
- return DistributedLogCluster.newBuilder()
- .numBookies(3)
- .shouldStartZK(true)
- .zkServers("127.0.0.1")
- .shouldStartProxy(false)
- .dlConf(conf)
- .bkConf(DLMTestUtil.loadTestBkConf())
- .build();
- }
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- dlCluster = createCluster(conf);
- dlCluster.start();
- }
-
- public void setupNoAdHocCluster() throws Exception {
- noAdHocCluster = createCluster(noAdHocConf);
- noAdHocCluster.start();
- noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false);
- Optional<String> serverSideRoutingFinagleName = Optional.absent();
- if (!clientSideRouting) {
- serverSideRoutingFinagleName =
- Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress()));
- }
- noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName);
- }
-
- public void tearDownNoAdHocCluster() throws Exception {
- if (null != noAdHocClient) {
- noAdHocClient.shutdown();
- }
- if (null != noAdHocServer) {
- noAdHocServer.shutdown();
- }
- }
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- if (null != dlCluster) {
- dlCluster.stop();
- }
- if (null != noAdHocCluster) {
- noAdHocCluster.stop();
- }
- }
-
- protected static URI getUri() {
- return dlCluster.getUri();
- }
-
- protected DistributedLogServerTestCase(boolean clientSideRouting) {
- this.clientSideRouting = clientSideRouting;
- }
-
- @Before
- public void setup() throws Exception {
- dlServer = createDistributedLogServer(7001);
- Optional<String> serverSideRoutingFinagleName = Optional.absent();
- if (!clientSideRouting) {
- serverSideRoutingFinagleName =
- Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress()));
- }
- dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName);
- }
-
- @After
- public void teardown() throws Exception {
- if (null != dlClient) {
- dlClient.shutdown();
- }
- if (null != dlServer) {
- dlServer.shutdown();
- }
- }
-
- protected DLServer createDistributedLogServer(int port) throws Exception {
- return new DLServer(conf, dlCluster.getUri(), port, false);
- }
-
- protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port)
- throws Exception {
- return new DLServer(conf, dlCluster.getUri(), port, false);
- }
-
- protected DLClient createDistributedLogClient(String clientName,
- Optional<String> serverSideRoutingFinagleName)
- throws Exception {
- return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName);
- }
-
- protected DLClient createDistributedLogClient(String clientName,
- String streamNameRegex,
- Optional<String> serverSideRoutingFinagleName)
- throws Exception {
- return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName);
- }
-
- protected TwoRegionDLClient createTwoRegionDLClient(String clientName,
- Map<SocketAddress, String> regionMap)
- throws Exception {
- return new TwoRegionDLClient(clientName, regionMap);
- }
-
- protected static void checkStreams(int numExpectedStreams, DLServer dlServer) {
- StreamManager streamManager = dlServer.dlServer.getKey().getStreamManager();
- assertEquals(numExpectedStreams, streamManager.numCached());
- assertEquals(numExpectedStreams, streamManager.numAcquired());
- }
-
- protected static void checkStreams(Set<String> streams, DLServer dlServer) {
- StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
- Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
- Set<String> acquiredStreams = streamManager.getAcquiredStreams().keySet();
-
- assertEquals(streams.size(), cachedStreams.size());
- assertEquals(streams.size(), acquiredStreams.size());
- assertTrue(Sets.difference(streams, cachedStreams).isEmpty());
- assertTrue(Sets.difference(streams, acquiredStreams).isEmpty());
- }
-
- protected static void checkStream(String name, DLClient dlClient, DLServer dlServer,
- int expectedNumProxiesInClient, int expectedClientCacheSize,
- int expectedServerCacheSize, boolean existedInServer, boolean existedInClient) {
- Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
- assertEquals(expectedNumProxiesInClient, distribution.size());
-
- if (expectedNumProxiesInClient > 0) {
- Map.Entry<SocketAddress, Set<String>> localEntry =
- distribution.entrySet().iterator().next();
- assertEquals(dlServer.getAddress(), localEntry.getKey());
- assertEquals(expectedClientCacheSize, localEntry.getValue().size());
- assertEquals(existedInClient, localEntry.getValue().contains(name));
- }
-
- StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
- Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
- Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
-
- assertEquals(expectedServerCacheSize, cachedStreams.size());
- assertEquals(existedInServer, cachedStreams.contains(name));
- assertEquals(expectedServerCacheSize, acquiredStreams.size());
- assertEquals(existedInServer, acquiredStreams.contains(name));
- }
-
- protected static Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(DLClient dlClient) {
- return dlClient.dlClient.getStreamOwnershipDistribution();
- }
-
- protected static Set<String> getAllStreamsFromDistribution(Map<SocketAddress, Set<String>> distribution) {
- Set<String> allStreams = new HashSet<String>();
- for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
- allStreams.addAll(entry.getValue());
- }
- return allStreams;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
deleted file mode 100644
index 29a3617..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
+++ /dev/null
@@ -1,720 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.AsyncLogReader;
-import org.apache.distributedlog.DLMTestUtil;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.LogReader;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.LogRecordWithDLSN;
-import org.apache.distributedlog.TestZooKeeperClientBuilder;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
-import org.apache.distributedlog.client.routing.LocalRoutingService;
-import org.apache.distributedlog.exceptions.DLException;
-import org.apache.distributedlog.exceptions.LogNotFoundException;
-import org.apache.distributedlog.impl.acl.ZKAccessControl;
-import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import org.apache.distributedlog.thrift.AccessControlEntry;
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId$;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for {@link DistributedLogServer}.
- */
-public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
-
- @Rule
- public TestName testName = new TestName();
-
- protected TestDistributedLogServerBase(boolean clientSideRouting) {
- super(clientSideRouting);
- }
-
- /**
- * {@link https://issues.apache.org/jira/browse/DL-27}.
- */
- @DistributedLogAnnotations.FlakyTest
- @Ignore
- @Test(timeout = 60000)
- public void testBasicWrite() throws Exception {
- String name = "dlserver-basic-write";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- for (long i = 1; i <= 10; i++) {
- logger.debug("Write entry {} to stream {}.", i, name);
- Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())));
- }
-
- HeartbeatOptions hbOptions = new HeartbeatOptions();
- hbOptions.setSendHeartBeatToReader(true);
- // make sure the first log segment of each stream created
- FutureUtils.result(dlClient.dlClient.heartbeat(name));
-
- DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
- LogReader reader = dlm.getInputStream(1);
- int numRead = 0;
- LogRecord r = reader.readNext(false);
- while (null != r) {
- ++numRead;
- int i = Integer.parseInt(new String(r.getPayload()));
- assertEquals(numRead, i);
- r = reader.readNext(false);
- }
- assertEquals(10, numRead);
- reader.close();
- dlm.close();
- }
-
- /**
- * Sanity check to make sure both checksum flag values work.
- */
- @Test(timeout = 60000)
- public void testChecksumFlag() throws Exception {
- String name = "testChecksumFlag";
- LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
- routingService.addHost(name, dlServer.getAddress());
- DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
- .name(name)
- .clientId(ClientId$.MODULE$.apply("test"))
- .routingService(routingService)
- .handshakeWithClientInfo(true)
- .clientBuilder(ClientBuilder.get()
- .hostConnectionLimit(1)
- .connectionTimeout(Duration.fromSeconds(1))
- .requestTimeout(Duration.fromSeconds(60)))
- .checksum(false);
- DistributedLogClient dlClient = dlClientBuilder.build();
- Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
- dlClient.close();
-
- dlClient = dlClientBuilder.checksum(true).build();
- Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes())));
- dlClient.close();
- }
-
- private void runSimpleBulkWriteTest(int writeCount) throws Exception {
- String name = String.format("dlserver-bulk-write-%d", writeCount);
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount);
- for (long i = 1; i <= writeCount; i++) {
- writes.add(ByteBuffer.wrap(("" + i).getBytes()));
- }
-
- logger.debug("Write {} entries to stream {}.", writeCount, name);
- List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
- assertEquals(futures.size(), writeCount);
- for (Future<DLSN> future : futures) {
- // No throw == pass.
- DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
- }
-
- DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
- LogReader reader = dlm.getInputStream(1);
- int numRead = 0;
- LogRecord r = reader.readNext(false);
- while (null != r) {
- int i = Integer.parseInt(new String(r.getPayload()));
- assertEquals(numRead + 1, i);
- ++numRead;
- r = reader.readNext(false);
- }
- assertEquals(writeCount, numRead);
- reader.close();
- dlm.close();
- }
-
- @Test(timeout = 60000)
- public void testBulkWrite() throws Exception {
- runSimpleBulkWriteTest(100);
- }
-
- @Test(timeout = 60000)
- public void testBulkWriteSingleWrite() throws Exception {
- runSimpleBulkWriteTest(1);
- }
-
- @Test(timeout = 60000)
- public void testBulkWriteEmptyList() throws Exception {
- String name = String.format("dlserver-bulk-write-%d", 0);
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
- List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
-
- assertEquals(0, futures.size());
- }
-
- @Test(timeout = 60000)
- public void testBulkWriteNullArg() throws Exception {
-
- String name = String.format("dlserver-bulk-write-%s", "null");
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
- writes.add(null);
-
- try {
- dlClient.dlClient.writeBulk(name, writes);
- fail("should not have succeeded");
- } catch (NullPointerException npe) {
- // expected
- logger.info("Expected to catch NullPointException.");
- }
- }
-
- @Test(timeout = 60000)
- public void testBulkWriteEmptyBuffer() throws Exception {
- String name = String.format("dlserver-bulk-write-%s", "empty");
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
- writes.add(ByteBuffer.wrap(("").getBytes()));
- writes.add(ByteBuffer.wrap(("").getBytes()));
- List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
- assertEquals(2, futures.size());
- for (Future<DLSN> future : futures) {
- // No throw == pass
- DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
- }
- }
-
- void failDueToWrongException(Exception ex) {
- logger.info("testBulkWritePartialFailure: ", ex);
- fail(String.format("failed with wrong exception %s", ex.getClass().getName()));
- }
-
- int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) {
- int failed = 0;
- for (int i = start; i < finish; i++) {
- Future<DLSN> future = futures.get(i);
- try {
- Await.result(future, Duration.fromSeconds(10));
- fail("future should have failed!");
- } catch (DLException cre) {
- ++failed;
- } catch (Exception ex) {
- failDueToWrongException(ex);
- }
- }
- return failed;
- }
-
- void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
- try {
- Await.result(future, Duration.fromSeconds(10));
- fail("should have failed");
- } catch (DLException dle) {
- assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode());
- } catch (Exception ex) {
- failDueToWrongException(ex);
- }
- }
-
- @Test(timeout = 60000)
- public void testBulkWritePartialFailure() throws Exception {
- String name = String.format("dlserver-bulk-write-%s", "partial-failure");
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- final int writeCount = 100;
-
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1);
- for (long i = 1; i <= writeCount; i++) {
- writes.add(ByteBuffer.wrap(("" + i).getBytes()));
- }
- // Too big, will cause partial failure.
- ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
- writes.add(buf);
- for (long i = 1; i <= writeCount; i++) {
- writes.add(ByteBuffer.wrap(("" + i).getBytes()));
- }
-
- // Count succeeded.
- List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
- int succeeded = 0;
- for (int i = 0; i < writeCount; i++) {
- Future<DLSN> future = futures.get(i);
- try {
- Await.result(future, Duration.fromSeconds(10));
- ++succeeded;
- } catch (Exception ex) {
- failDueToWrongException(ex);
- }
- }
-
- validateFailedAsLogRecordTooLong(futures.get(writeCount));
- FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
- assertEquals(writeCount, succeeded);
- }
-
- @Test(timeout = 60000)
- public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception {
- String name = String.format("dlserver-bulk-write-%s", "first-write-failed");
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- final int writeCount = 100;
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
- ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
- writes.add(buf);
- for (long i = 1; i <= writeCount; i++) {
- writes.add(ByteBuffer.wrap(("" + i).getBytes()));
- }
-
- List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
- validateFailedAsLogRecordTooLong(futures.get(0));
- FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
- }
-
- @Test(timeout = 60000)
- public void testBulkWriteTotalFailureLostLock() throws Exception {
- String name = String.format("dlserver-bulk-write-%s", "lost-lock");
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- final int writeCount = 8;
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
- ByteBuffer buf = ByteBuffer.allocate(8);
- writes.add(buf);
- for (long i = 1; i <= writeCount; i++) {
- writes.add(ByteBuffer.wrap(("" + i).getBytes()));
- }
- // Warm it up with a write.
- Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8)));
-
- // Failpoint a lost lock, make sure the failure gets promoted to an operation failure.
- DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft();
- try {
- FailpointUtils.setFailpoint(
- FailpointUtils.FailPointName.FP_WriteInternalLostLock,
- FailpointUtils.FailPointActions.FailPointAction_Default
- );
- Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext());
- assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code);
- } finally {
- FailpointUtils.removeFailpoint(
- FailpointUtils.FailPointName.FP_WriteInternalLostLock
- );
- }
- }
-
- @Test(timeout = 60000)
- public void testHeartbeat() throws Exception {
- String name = "dlserver-heartbeat";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- for (long i = 1; i <= 10; i++) {
- logger.debug("Send heartbeat {} to stream {}.", i, name);
- dlClient.dlClient.check(name).get();
- }
-
- logger.debug("Write entry one to stream {}.", name);
- dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get();
-
- Thread.sleep(1000);
-
- DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
- LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
- int numRead = 0;
- // eid=0 => control records
- // other 9 heartbeats will not trigger writing any control records.
- // eid=1 => user entry
- long startEntryId = 1;
- LogRecordWithDLSN r = reader.readNext(false);
- while (null != r) {
- int i = Integer.parseInt(new String(r.getPayload()));
- assertEquals(numRead + 1, i);
- assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0);
- ++numRead;
- ++startEntryId;
- r = reader.readNext(false);
- }
- assertEquals(1, numRead);
- }
-
- @Test(timeout = 60000)
- public void testFenceWrite() throws Exception {
- String name = "dlserver-fence-write";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- for (long i = 1; i <= 10; i++) {
- logger.debug("Write entry {} to stream {}.", i, name);
- dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
- }
-
- Thread.sleep(1000);
-
- logger.info("Fencing stream {}.", name);
- DLMTestUtil.fenceStream(conf, getUri(), name);
- logger.info("Fenced stream {}.", name);
-
- for (long i = 11; i <= 20; i++) {
- logger.debug("Write entry {} to stream {}.", i, name);
- dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
- }
-
- DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
- LogReader reader = dlm.getInputStream(1);
- int numRead = 0;
- LogRecord r = reader.readNext(false);
- while (null != r) {
- int i = Integer.parseInt(new String(r.getPayload()));
- assertEquals(numRead + 1, i);
- ++numRead;
- r = reader.readNext(false);
- }
- assertEquals(20, numRead);
- reader.close();
- dlm.close();
- }
-
- @Test(timeout = 60000)
- public void testDeleteStream() throws Exception {
- String name = "dlserver-delete-stream";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- long txid = 101;
- for (long i = 1; i <= 10; i++) {
- long curTxId = txid++;
- logger.debug("Write entry {} to stream {}.", curTxId, name);
- dlClient.dlClient.write(name,
- ByteBuffer.wrap(("" + curTxId).getBytes())).get();
- }
-
- checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-
- dlClient.dlClient.delete(name).get();
-
- checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-
- Thread.sleep(1000);
-
- DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri());
- AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN));
- try {
- FutureUtils.result(reader101.readNext());
- fail("Should fail with LogNotFoundException since the stream is deleted");
- } catch (LogNotFoundException lnfe) {
- // expected
- }
- FutureUtils.result(reader101.asyncClose());
- dlm101.close();
-
- txid = 201;
- for (long i = 1; i <= 10; i++) {
- long curTxId = txid++;
- logger.debug("Write entry {} to stream {}.", curTxId, name);
- DLSN dlsn = dlClient.dlClient.write(name,
- ByteBuffer.wrap(("" + curTxId).getBytes())).get();
- }
- Thread.sleep(1000);
-
- DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri());
- LogReader reader201 = dlm201.getInputStream(1);
- int numRead = 0;
- int curTxId = 201;
- LogRecord r = reader201.readNext(false);
- while (null != r) {
- int i = Integer.parseInt(new String(r.getPayload()));
- assertEquals(curTxId++, i);
- ++numRead;
- r = reader201.readNext(false);
- }
- assertEquals(10, numRead);
- reader201.close();
- dlm201.close();
- }
-
- @Test(timeout = 60000)
- public void testCreateStream() throws Exception {
- try {
- setupNoAdHocCluster();
- final String name = "dlserver-create-stream";
-
- noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress());
- assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
- assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-
- long txid = 101;
- for (long i = 1; i <= 10; i++) {
- long curTxId = txid++;
- logger.debug("Write entry {} to stream {}.", curTxId, name);
- noAdHocClient.dlClient.write(name,
- ByteBuffer.wrap(("" + curTxId).getBytes())).get();
- }
-
- assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
- } finally {
- tearDownNoAdHocCluster();
- }
- }
-
- /**
- * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing.
- */
- @Test(timeout = 60000)
- public void testCreateStreamTwice() throws Exception {
- try {
- setupNoAdHocCluster();
- final String name = "dlserver-create-stream-twice";
-
- noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress());
- assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
- assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
-
- long txid = 101;
- for (long i = 1; i <= 10; i++) {
- long curTxId = txid++;
- logger.debug("Write entry {} to stream {}.", curTxId, name);
- noAdHocClient.dlClient.write(name,
- ByteBuffer.wrap(("" + curTxId).getBytes())).get();
- }
-
- assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
-
- // create again
- assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
- assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
- } finally {
- tearDownNoAdHocCluster();
- }
- }
-
-
-
- @Test(timeout = 60000)
- public void testTruncateStream() throws Exception {
- String name = "dlserver-truncate-stream";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- long txid = 1;
- Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>();
- for (int s = 1; s <= 2; s++) {
- for (long i = 1; i <= 10; i++) {
- long curTxId = txid++;
- logger.debug("Write entry {} to stream {}.", curTxId, name);
- DLSN dlsn = dlClient.dlClient.write(name,
- ByteBuffer.wrap(("" + curTxId).getBytes())).get();
- txid2DLSN.put(curTxId, dlsn);
- }
- if (s == 1) {
- dlClient.dlClient.release(name).get();
- }
- }
-
- DLSN dlsnToDelete = txid2DLSN.get(11L);
- dlClient.dlClient.truncate(name, dlsnToDelete).get();
-
- DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri());
- LogReader reader = readDLM.getInputStream(1);
- int numRead = 0;
- int curTxId = 11;
- LogRecord r = reader.readNext(false);
- while (null != r) {
- int i = Integer.parseInt(new String(r.getPayload()));
- assertEquals(curTxId++, i);
- ++numRead;
- r = reader.readNext(false);
- }
- assertEquals(10, numRead);
- reader.close();
- readDLM.close();
- }
-
- @Test(timeout = 60000)
- public void testRequestDenied() throws Exception {
- String name = "request-denied";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- AccessControlEntry ace = new AccessControlEntry();
- ace.setDenyWrite(true);
- ZooKeeperClient zkc = TestZooKeeperClientBuilder
- .newBuilder()
- .uri(getUri())
- .connectionTimeoutMs(60000)
- .sessionTimeoutMs(60000)
- .build();
- DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
- BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
- String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
- ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
- accessControl.create(zkc);
-
- AccessControlManager acm = dlNamespace.createAccessControlManager();
- while (acm.allowWrite(name)) {
- Thread.sleep(100);
- }
-
- try {
- Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
- fail("Should fail with request denied exception");
- } catch (DLException dle) {
- assertEquals(StatusCode.REQUEST_DENIED, dle.getCode());
- }
- }
-
- @Test(timeout = 60000)
- public void testNoneStreamNameRegex() throws Exception {
- String streamNamePrefix = "none-stream-name-regex-";
- int numStreams = 5;
- Set<String> streams = new HashSet<String>();
-
- for (int i = 0; i < numStreams; i++) {
- streams.add(streamNamePrefix + i);
- }
- testStreamNameRegex(streams, ".*", streams);
- }
-
- @Test(timeout = 60000)
- public void testStreamNameRegex() throws Exception {
- String streamNamePrefix = "stream-name-regex-";
- int numStreams = 5;
- Set<String> streams = new HashSet<String>();
- Set<String> expectedStreams = new HashSet<String>();
- String streamNameRegex = streamNamePrefix + "1";
-
- for (int i = 0; i < numStreams; i++) {
- streams.add(streamNamePrefix + i);
- }
- expectedStreams.add(streamNamePrefix + "1");
-
- testStreamNameRegex(streams, streamNameRegex, expectedStreams);
- }
-
- private void testStreamNameRegex(Set<String> streams, String streamNameRegex,
- Set<String> expectedStreams)
- throws Exception {
- for (String streamName : streams) {
- dlClient.routingService.addHost(streamName, dlServer.getAddress());
- Await.result(dlClient.dlClient.write(streamName,
- ByteBuffer.wrap(streamName.getBytes(UTF_8))));
- }
-
- DLClient client = createDistributedLogClient(
- "test-stream-name-regex",
- streamNameRegex,
- Optional.<String>absent());
- try {
- client.routingService.addHost("unknown", dlServer.getAddress());
- client.handshake();
- Map<SocketAddress, Set<String>> distribution =
- client.dlClient.getStreamOwnershipDistribution();
- assertEquals(1, distribution.size());
- Set<String> cachedStreams = distribution.values().iterator().next();
- assertNotNull(cachedStreams);
- assertEquals(expectedStreams.size(), cachedStreams.size());
-
- for (String streamName : cachedStreams) {
- assertTrue(expectedStreams.contains(streamName));
- }
- } finally {
- client.shutdown();
- }
- }
-
- @Test(timeout = 60000)
- public void testReleaseStream() throws Exception {
- String name = "dlserver-release-stream";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
-
- Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
- checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
-
- // release the stream
- Await.result(dlClient.dlClient.release(name));
- checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
- }
-
- protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize,
- String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) {
- Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
- assertEquals(expectedNumProxiesInClient, distribution.size());
-
- if (expectedNumProxiesInClient > 0) {
- Map.Entry<SocketAddress, Set<String>> localEntry =
- distribution.entrySet().iterator().next();
- assertEquals(owner, localEntry.getKey());
- assertEquals(expectedClientCacheSize, localEntry.getValue().size());
- assertEquals(existedInClient, localEntry.getValue().contains(name));
- }
-
-
- StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
- Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
- Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
-
- assertEquals(expectedServerCacheSize, cachedStreams.size());
- assertEquals(existedInServer, cachedStreams.contains(name));
- assertEquals(expectedServerCacheSize, acquiredStreams.size());
- assertEquals(existedInServer, acquiredStreams.contains(name));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
deleted file mode 100644
index c7ae960..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.fail;
-
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.util.Await;
-import java.nio.ByteBuffer;
-import org.junit.Test;
-
-/**
- * Test the server with client side routing.
- */
-public class TestDistributedLogServerClientRouting extends TestDistributedLogServerBase {
-
- public TestDistributedLogServerClientRouting() {
- super(true);
- }
-
- @Test(timeout = 60000)
- public void testAcceptNewStream() throws Exception {
- String name = "dlserver-accept-new-stream";
-
- dlClient.routingService.addHost(name, dlServer.getAddress());
- dlClient.routingService.setAllowRetrySameHost(false);
-
- Await.result(dlClient.dlClient.setAcceptNewStream(false));
-
- try {
- Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
- fail("Should fail because the proxy couldn't accept new stream");
- } catch (NoBrokersAvailableException nbae) {
- // expected
- }
- checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
-
- Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true));
- Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
- checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
deleted file mode 100644
index 12416a3..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-/**
- * Test the server with client side routing.
- */
-public class TestDistributedLogServerServerRouting extends TestDistributedLogServerBase {
-
- public TestDistributedLogServerServerRouting() {
- super(false);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
deleted file mode 100644
index e5d75c2..0000000
--- a/distributedlog-service/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
+++ /dev/null
@@ -1,833 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.TestDistributedLogBase;
-import org.apache.distributedlog.acl.DefaultAccessControlManager;
-import org.apache.distributedlog.client.routing.LocalRoutingService;
-import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import org.apache.distributedlog.exceptions.StreamUnavailableException;
-import org.apache.distributedlog.service.config.NullStreamConfigProvider;
-import org.apache.distributedlog.service.config.ServerConfiguration;
-import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
-import org.apache.distributedlog.service.stream.Stream;
-import org.apache.distributedlog.service.stream.StreamImpl;
-import org.apache.distributedlog.service.stream.StreamImpl.StreamStatus;
-import org.apache.distributedlog.service.stream.StreamManagerImpl;
-import org.apache.distributedlog.service.stream.WriteOp;
-import org.apache.distributedlog.service.streamset.DelimiterStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.ProtocolUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.feature.SettableFeature;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test Case for DistributedLog Service.
- */
-public class TestDistributedLogService extends TestDistributedLogBase {
-
- private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
-
- @Rule
- public TestName testName = new TestName();
-
- private ServerConfiguration serverConf;
- private DistributedLogConfiguration dlConf;
- private URI uri;
- private final CountDownLatch latch = new CountDownLatch(1);
- private DistributedLogServiceImpl service;
-
- @Before
- @Override
- public void setup() throws Exception {
- super.setup();
- dlConf = new DistributedLogConfiguration();
- dlConf.addConfiguration(conf);
- dlConf.setLockTimeout(0)
- .setOutputBufferSize(0)
- .setPeriodicFlushFrequencyMilliSeconds(10)
- .setSchedulerShutdownTimeoutMs(100);
- serverConf = newLocalServerConf();
- uri = createDLMURI("/" + testName.getMethodName());
- ensureURICreated(uri);
- service = createService(serverConf, dlConf, latch);
- }
-
- @After
- @Override
- public void teardown() throws Exception {
- if (null != service) {
- service.shutdown();
- }
- super.teardown();
- }
-
- private DistributedLogConfiguration newLocalConf() {
- DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
- confLocal.addConfiguration(dlConf);
- return confLocal;
- }
-
- private ServerConfiguration newLocalServerConf() {
- ServerConfiguration serverConf = new ServerConfiguration();
- serverConf.loadConf(dlConf);
- serverConf.setServerThreads(1);
- return serverConf;
- }
-
- private DistributedLogServiceImpl createService(
- ServerConfiguration serverConf,
- DistributedLogConfiguration dlConf) throws Exception {
- return createService(serverConf, dlConf, new CountDownLatch(1));
- }
-
- private DistributedLogServiceImpl createService(
- ServerConfiguration serverConf,
- DistributedLogConfiguration dlConf,
- CountDownLatch latch) throws Exception {
- // Build the stream partition converter
- StreamPartitionConverter converter;
- try {
- converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
- } catch (ConfigurationException e) {
- logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
- IdentityStreamPartitionConverter.class.getName());
- converter = new IdentityStreamPartitionConverter();
- }
- return new DistributedLogServiceImpl(
- serverConf,
- dlConf,
- ConfUtils.getConstDynConf(dlConf),
- new NullStreamConfigProvider(),
- uri,
- converter,
- new LocalRoutingService(),
- NullStatsLogger.INSTANCE,
- NullStatsLogger.INSTANCE,
- latch,
- new EqualLoadAppraiser());
- }
-
- private StreamImpl createUnstartedStream(DistributedLogServiceImpl service,
- String name) throws Exception {
- StreamImpl stream = (StreamImpl) service.newStream(name);
- stream.initialize();
- return stream;
- }
-
- private ByteBuffer createRecord(long txid) {
- return ByteBuffer.wrap(("record-" + txid).getBytes(UTF_8));
- }
-
- private WriteOp createWriteOp(DistributedLogServiceImpl service,
- String streamName,
- long txid) {
- ByteBuffer data = createRecord(txid);
- return service.newWriteOp(streamName, data, null);
- }
-
- @Test(timeout = 60000)
- public void testAcquireStreams() throws Exception {
- String streamName = testName.getMethodName();
- StreamImpl s0 = createUnstartedStream(service, streamName);
- ServerConfiguration serverConf1 = new ServerConfiguration();
- serverConf1.addConfiguration(serverConf);
- serverConf1.setServerPort(9999);
- DistributedLogServiceImpl service1 = createService(serverConf1, dlConf);
- StreamImpl s1 = createUnstartedStream(service1, streamName);
-
- // create write ops
- WriteOp op0 = createWriteOp(service, streamName, 0L);
- s0.submit(op0);
-
- WriteOp op1 = createWriteOp(service1, streamName, 1L);
- s1.submit(op1);
-
- // check pending size
- assertEquals("Write Op 0 should be pending in service 0",
- 1, s0.numPendingOps());
- assertEquals("Write Op 1 should be pending in service 1",
- 1, s1.numPendingOps());
-
- // start acquiring s0
- s0.start();
- WriteResponse wr0 = Await.result(op0.result());
- assertEquals("Op 0 should succeed",
- StatusCode.SUCCESS, wr0.getHeader().getCode());
- assertEquals("Service 0 should acquire stream",
- StreamStatus.INITIALIZED, s0.getStatus());
- assertNotNull(s0.getManager());
- assertNotNull(s0.getWriter());
- assertNull(s0.getLastException());
-
- // start acquiring s1
- s1.start();
- WriteResponse wr1 = Await.result(op1.result());
- assertEquals("Op 1 should fail",
- StatusCode.FOUND, wr1.getHeader().getCode());
- // the stream will be set to ERROR and then be closed.
- assertTrue("Service 1 should be in unavailable state",
- StreamStatus.isUnavailable(s1.getStatus()));
- assertNotNull(s1.getManager());
- assertNull(s1.getWriter());
- assertNotNull(s1.getLastException());
- assertTrue(s1.getLastException() instanceof OwnershipAcquireFailedException);
-
- service1.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testAcquireStreamsWhenExceedMaxCachedPartitions() throws Exception {
- String streamName = testName.getMethodName() + "_0000";
-
- DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
- confLocal.addConfiguration(dlConf);
- confLocal.setMaxCachedPartitionsPerProxy(1);
-
- ServerConfiguration serverConfLocal = new ServerConfiguration();
- serverConfLocal.addConfiguration(serverConf);
- serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
-
- DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
- Stream stream = serviceLocal.getLogWriter(streamName);
-
- // stream is cached
- assertNotNull(stream);
- assertEquals(1, serviceLocal.getStreamManager().numCached());
-
- // create write ops
- WriteOp op0 = createWriteOp(service, streamName, 0L);
- stream.submit(op0);
- WriteResponse wr0 = Await.result(op0.result());
- assertEquals("Op 0 should succeed",
- StatusCode.SUCCESS, wr0.getHeader().getCode());
- assertEquals(1, serviceLocal.getStreamManager().numAcquired());
-
- // should fail to acquire another partition
- try {
- serviceLocal.getLogWriter(testName.getMethodName() + "_0001");
- fail("Should fail to acquire new streams");
- } catch (StreamUnavailableException sue) {
- // expected
- }
- assertEquals(1, serviceLocal.getStreamManager().numCached());
- assertEquals(1, serviceLocal.getStreamManager().numAcquired());
-
- // should be able to acquire partitions from other streams
- String anotherStreamName = testName.getMethodName() + "-another_0001";
- Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
- assertNotNull(anotherStream);
- assertEquals(2, serviceLocal.getStreamManager().numCached());
-
- // create write ops
- WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
- anotherStream.submit(op1);
- WriteResponse wr1 = Await.result(op1.result());
- assertEquals("Op 1 should succeed",
- StatusCode.SUCCESS, wr1.getHeader().getCode());
- assertEquals(2, serviceLocal.getStreamManager().numAcquired());
- }
-
- @Test(timeout = 60000)
- public void testAcquireStreamsWhenExceedMaxAcquiredPartitions() throws Exception {
- String streamName = testName.getMethodName() + "_0000";
-
- DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
- confLocal.addConfiguration(dlConf);
- confLocal.setMaxCachedPartitionsPerProxy(-1);
- confLocal.setMaxAcquiredPartitionsPerProxy(1);
-
- ServerConfiguration serverConfLocal = new ServerConfiguration();
- serverConfLocal.addConfiguration(serverConf);
- serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
-
- DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
- Stream stream = serviceLocal.getLogWriter(streamName);
-
- // stream is cached
- assertNotNull(stream);
- assertEquals(1, serviceLocal.getStreamManager().numCached());
-
- // create write ops
- WriteOp op0 = createWriteOp(service, streamName, 0L);
- stream.submit(op0);
- WriteResponse wr0 = Await.result(op0.result());
- assertEquals("Op 0 should succeed",
- StatusCode.SUCCESS, wr0.getHeader().getCode());
- assertEquals(1, serviceLocal.getStreamManager().numAcquired());
-
- // should be able to cache partitions from same stream
- String anotherStreamName = testName.getMethodName() + "_0001";
- Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
- assertNotNull(anotherStream);
- assertEquals(2, serviceLocal.getStreamManager().numCached());
-
- // create write ops
- WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
- anotherStream.submit(op1);
- WriteResponse wr1 = Await.result(op1.result());
- assertEquals("Op 1 should fail",
- StatusCode.STREAM_UNAVAILABLE, wr1.getHeader().getCode());
- assertEquals(1, serviceLocal.getStreamManager().numAcquired());
- }
-
- @Test(timeout = 60000)
- public void testCloseShouldErrorOutPendingOps() throws Exception {
- String streamName = testName.getMethodName();
- StreamImpl s = createUnstartedStream(service, streamName);
-
- int numWrites = 10;
- List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
- for (int i = 0; i < numWrites; i++) {
- WriteOp op = createWriteOp(service, streamName, i);
- s.submit(op);
- futureList.add(op.result());
- }
- assertEquals(numWrites, s.numPendingOps());
- Await.result(s.requestClose("close stream"));
- assertEquals("Stream " + streamName + " is set to " + StreamStatus.CLOSED,
- StreamStatus.CLOSED, s.getStatus());
- for (int i = 0; i < numWrites; i++) {
- Future<WriteResponse> future = futureList.get(i);
- WriteResponse wr = Await.result(future);
- assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
- StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
- }
- }
-
- @Test(timeout = 60000)
- public void testCloseTwice() throws Exception {
- String streamName = testName.getMethodName();
- StreamImpl s = createUnstartedStream(service, streamName);
-
- int numWrites = 10;
- List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
- for (int i = 0; i < numWrites; i++) {
- WriteOp op = createWriteOp(service, streamName, i);
- s.submit(op);
- futureList.add(op.result());
- }
- assertEquals(numWrites, s.numPendingOps());
-
- Future<Void> closeFuture0 = s.requestClose("close 0");
- assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
- StreamStatus.CLOSING == s.getStatus()
- || StreamStatus.CLOSED == s.getStatus());
- Future<Void> closeFuture1 = s.requestClose("close 1");
- assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
- StreamStatus.CLOSING == s.getStatus()
- || StreamStatus.CLOSED == s.getStatus());
-
- Await.result(closeFuture0);
- assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
- StreamStatus.CLOSED, s.getStatus());
- Await.result(closeFuture1);
- assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
- StreamStatus.CLOSED, s.getStatus());
-
- for (int i = 0; i < numWrites; i++) {
- Future<WriteResponse> future = futureList.get(i);
- WriteResponse wr = Await.result(future);
- assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
- StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
- }
- }
-
- @Test(timeout = 60000)
- public void testFailRequestsDuringClosing() throws Exception {
- String streamName = testName.getMethodName();
- StreamImpl s = createUnstartedStream(service, streamName);
-
- Future<Void> closeFuture = s.requestClose("close");
- assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
- StreamStatus.CLOSING == s.getStatus()
- || StreamStatus.CLOSED == s.getStatus());
- WriteOp op1 = createWriteOp(service, streamName, 0L);
- s.submit(op1);
- WriteResponse response1 = Await.result(op1.result());
- assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closing",
- StatusCode.STREAM_UNAVAILABLE, response1.getHeader().getCode());
-
- Await.result(closeFuture);
- assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
- StreamStatus.CLOSED, s.getStatus());
- WriteOp op2 = createWriteOp(service, streamName, 1L);
- s.submit(op2);
- WriteResponse response2 = Await.result(op2.result());
- assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closed",
- StatusCode.STREAM_UNAVAILABLE, response2.getHeader().getCode());
- }
-
- @Test(timeout = 60000)
- public void testServiceTimeout() throws Exception {
- DistributedLogConfiguration confLocal = newLocalConf();
- confLocal.setOutputBufferSize(Integer.MAX_VALUE)
- .setImmediateFlushEnabled(false)
- .setPeriodicFlushFrequencyMilliSeconds(0);
- ServerConfiguration serverConfLocal = newLocalServerConf();
- serverConfLocal.addConfiguration(serverConf);
- serverConfLocal.setServiceTimeoutMs(200)
- .setStreamProbationTimeoutMs(100);
- String streamName = testName.getMethodName();
- // create a new service with 200ms timeout
- DistributedLogServiceImpl localService = createService(serverConfLocal, confLocal);
- StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
- int numWrites = 10;
- List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
- for (int i = 0; i < numWrites; i++) {
- futureList.add(localService.write(streamName, createRecord(i)));
- }
-
- assertTrue("Stream " + streamName + " should be cached",
- streamManager.getCachedStreams().containsKey(streamName));
-
- StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName);
- // the stream should be set CLOSING
- while (StreamStatus.CLOSING != s.getStatus()
- && StreamStatus.CLOSED != s.getStatus()) {
- TimeUnit.MILLISECONDS.sleep(20);
- }
- assertNotNull("Writer should be initialized", s.getWriter());
- assertNull("No exception should be thrown", s.getLastException());
- Future<Void> closeFuture = s.getCloseFuture();
- Await.result(closeFuture);
- for (int i = 0; i < numWrites; i++) {
- assertTrue("Write should not fail before closing",
- futureList.get(i).isDefined());
- WriteResponse response = Await.result(futureList.get(i));
- assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
- StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
- || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
- || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
- }
-
- while (streamManager.getCachedStreams().containsKey(streamName)) {
- TimeUnit.MILLISECONDS.sleep(20);
- }
-
- assertFalse("Stream should be removed from cache",
- streamManager.getCachedStreams().containsKey(streamName));
- assertFalse("Stream should be removed from acquired cache",
- streamManager.getAcquiredStreams().containsKey(streamName));
-
- localService.shutdown();
- }
-
- private DistributedLogServiceImpl createConfiguredLocalService() throws Exception {
- DistributedLogConfiguration confLocal = newLocalConf();
- confLocal.setOutputBufferSize(0)
- .setImmediateFlushEnabled(true)
- .setPeriodicFlushFrequencyMilliSeconds(0);
- return createService(serverConf, confLocal);
- }
-
- private ByteBuffer getTestDataBuffer() {
- return ByteBuffer.wrap("test-data".getBytes());
- }
-
- @Test(timeout = 60000)
- public void testNonDurableWrite() throws Exception {
- DistributedLogConfiguration confLocal = newLocalConf();
- confLocal.setOutputBufferSize(Integer.MAX_VALUE)
- .setImmediateFlushEnabled(false)
- .setPeriodicFlushFrequencyMilliSeconds(0)
- .setDurableWriteEnabled(false);
- ServerConfiguration serverConfLocal = new ServerConfiguration();
- serverConfLocal.addConfiguration(serverConf);
- serverConfLocal.enableDurableWrite(false);
- serverConfLocal.setServiceTimeoutMs(Integer.MAX_VALUE)
- .setStreamProbationTimeoutMs(Integer.MAX_VALUE);
- String streamName = testName.getMethodName();
- DistributedLogServiceImpl localService =
- createService(serverConfLocal, confLocal);
- StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
- int numWrites = 10;
- List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>();
- for (int i = 0; i < numWrites; i++) {
- futureList.add(localService.write(streamName, createRecord(i)));
- }
- assertTrue("Stream " + streamName + " should be cached",
- streamManager.getCachedStreams().containsKey(streamName));
- List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList));
- for (WriteResponse wr : resultList) {
- assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn()));
- }
-
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testWriteOpNoChecksum() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- WriteContext ctx = new WriteContext();
- Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testTruncateOpNoChecksum() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- WriteContext ctx = new WriteContext();
- Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testStreamOpNoChecksum() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- WriteContext ctx = new WriteContext();
- HeartbeatOptions option = new HeartbeatOptions();
- option.setSendHeartBeatToReader(true);
-
- // hearbeat to acquire the stream and then release the stream
- Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
- result = localService.release("test", ctx);
- resp = Await.result(result);
- assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-
- // heartbeat to acquire the stream and then delete the stream
- result = localService.heartbeatWithOptions("test", ctx, option);
- resp = Await.result(result);
- assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
- result = localService.delete("test", ctx);
- resp = Await.result(result);
- assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
-
- // shutdown the local service
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testWriteOpChecksumBadChecksum() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- WriteContext ctx = new WriteContext().setCrc32(999);
- Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testWriteOpChecksumBadStream() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- WriteContext ctx = new WriteContext().setCrc32(
- ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array()));
- Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testWriteOpChecksumBadData() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- ByteBuffer buffer = getTestDataBuffer();
- WriteContext ctx = new WriteContext().setCrc32(
- ProtocolUtils.writeOpCRC32("test", buffer.array()));
-
- // Overwrite 1 byte to corrupt data.
- buffer.put(1, (byte) 0xab);
- Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testStreamOpChecksumBadChecksum() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- WriteContext ctx = new WriteContext().setCrc32(999);
- Future<WriteResponse> result = localService.heartbeat("test", ctx);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
- result = localService.release("test", ctx);
- resp = Await.result(result);
- assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
- result = localService.delete("test", ctx);
- resp = Await.result(result);
- assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testTruncateOpChecksumBadChecksum() throws Exception {
- DistributedLogServiceImpl localService = createConfiguredLocalService();
- WriteContext ctx = new WriteContext().setCrc32(999);
- Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
- WriteResponse resp = Await.result(result);
- assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
- localService.shutdown();
- }
-
- private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long checksum) {
- return new WriteOp(name,
- ByteBuffer.wrap("test".getBytes()),
- new NullStatsLogger(),
- new NullStatsLogger(),
- new IdentityStreamPartitionConverter(),
- new ServerConfiguration(),
- (byte) 0,
- checksum,
- false,
- disabledFeature,
- DefaultAccessControlManager.INSTANCE);
- }
-
- @Test(timeout = 60000)
- public void testStreamOpBadChecksumWithChecksumDisabled() throws Exception {
- String streamName = testName.getMethodName();
-
- SettableFeature disabledFeature = new SettableFeature("", 0);
-
- WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, 919191L);
- WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, 919191L);
-
- try {
- writeOp0.preExecute();
- fail("should have thrown");
- } catch (Exception ex) {
- }
-
- disabledFeature.set(1);
- writeOp1.preExecute();
- }
-
- @Test(timeout = 60000)
- public void testStreamOpGoodChecksumWithChecksumDisabled() throws Exception {
- String streamName = testName.getMethodName();
-
- SettableFeature disabledFeature = new SettableFeature("", 1);
- WriteOp writeOp0 = getWriteOp(
- streamName,
- disabledFeature,
- ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
- WriteOp writeOp1 = getWriteOp(
- streamName,
- disabledFeature,
- ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
-
- writeOp0.preExecute();
- disabledFeature.set(0);
- writeOp1.preExecute();
- }
-
- @Test(timeout = 60000)
- public void testCloseStreamsShouldFlush() throws Exception {
- DistributedLogConfiguration confLocal = newLocalConf();
- confLocal.setOutputBufferSize(Integer.MAX_VALUE)
- .setImmediateFlushEnabled(false)
- .setPeriodicFlushFrequencyMilliSeconds(0);
-
- String streamNamePrefix = testName.getMethodName();
- DistributedLogServiceImpl localService = createService(serverConf, confLocal);
- StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
- int numStreams = 10;
- int numWrites = 10;
- List<Future<WriteResponse>> futureList =
- Lists.newArrayListWithExpectedSize(numStreams * numWrites);
- for (int i = 0; i < numStreams; i++) {
- String streamName = streamNamePrefix + "-" + i;
- HeartbeatOptions hbOptions = new HeartbeatOptions();
- hbOptions.setSendHeartBeatToReader(true);
- // make sure the first log segment of each stream created
- FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
- for (int j = 0; j < numWrites; j++) {
- futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
- }
- }
-
- assertEquals("There should be " + numStreams + " streams in cache",
- numStreams, streamManager.getCachedStreams().size());
- while (streamManager.getAcquiredStreams().size() < numStreams) {
- TimeUnit.MILLISECONDS.sleep(20);
- }
-
- Future<List<Void>> closeResult = localService.closeStreams();
- List<Void> closedStreams = Await.result(closeResult);
- assertEquals("There should be " + numStreams + " streams closed",
- numStreams, closedStreams.size());
- // all writes should be flushed
- for (Future<WriteResponse> future : futureList) {
- WriteResponse response = Await.result(future);
- assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(),
- StatusCode.SUCCESS == response.getHeader().getCode()
- || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
- || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
- }
- assertTrue("There should be no streams in the cache",
- streamManager.getCachedStreams().isEmpty());
- assertTrue("There should be no streams in the acquired cache",
- streamManager.getAcquiredStreams().isEmpty());
-
- localService.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testCloseStreamsShouldAbort() throws Exception {
- DistributedLogConfiguration confLocal = newLocalConf();
- confLocal.setOutputBufferSize(Integer.MAX_VALUE)
- .setImmediateFlushEnabled(false)
- .setPeriodicFlushFrequencyMilliSeconds(0);
-
- String streamNamePrefix = testName.getMethodName();
- DistributedLogServiceImpl localService = createService(serverConf, confLocal);
- StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
-
- int numStreams = 10;
- int numWrites = 10;
- List<Future<WriteResponse>> futureList =
- Lists.newArrayListWithExpectedSize(numStreams * numWrites);
- for (int i = 0; i < numStreams; i++) {
- String streamName = streamNamePrefix + "-" + i;
- HeartbeatOptions hbOptions = new HeartbeatOptions();
- hbOptions.setSendHeartBeatToReader(true);
- // make sure the first log segment of each stream created
- FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
- for (int j = 0; j < numWrites; j++) {
- futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
- }
- }
-
- assertEquals("There should be " + numStreams + " streams in cache",
- numStreams, streamManager.getCachedStreams().size());
- while (streamManager.getAcquiredStreams().size() < numStreams) {
- TimeUnit.MILLISECONDS.sleep(20);
- }
-
- for (Stream s : streamManager.getAcquiredStreams().values()) {
- StreamImpl stream = (StreamImpl) s;
- stream.setStatus(StreamStatus.ERROR);
- }
-
- Future<List<Void>> closeResult = localService.closeStreams();
- List<Void> closedStreams = Await.result(closeResult);
- assertEquals("There should be " + numStreams + " streams closed",
- numStreams, closedStreams.size());
- // all writes should be flushed
- for (Future<WriteResponse> future : futureList) {
- WriteResponse response = Await.result(future);
- assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : "
- + response.getHeader().getCode(),
- StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
- || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
- || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
- }
- // acquired streams should all been removed after we close them
- assertTrue("There should be no streams in the acquired cache",
- streamManager.getAcquiredStreams().isEmpty());
- localService.shutdown();
- // cached streams wouldn't be removed immediately after streams are closed
- // but they should be removed after we shutdown the service
- assertTrue("There should be no streams in the cache after shutting down the service",
- streamManager.getCachedStreams().isEmpty());
- }
-
- @Test(timeout = 60000)
- public void testShutdown() throws Exception {
- service.shutdown();
- StreamManagerImpl streamManager = (StreamManagerImpl) service.getStreamManager();
- WriteResponse response =
- Await.result(service.write(testName.getMethodName(), createRecord(0L)));
- assertEquals("Write should fail with " + StatusCode.SERVICE_UNAVAILABLE,
- StatusCode.SERVICE_UNAVAILABLE, response.getHeader().getCode());
- assertTrue("There should be no streams created after shutdown",
- streamManager.getCachedStreams().isEmpty());
- assertTrue("There should be no streams acquired after shutdown",
- streamManager.getAcquiredStreams().isEmpty());
- }
-
- @Test(timeout = 60000)
- public void testGetOwner() throws Exception {
- ((LocalRoutingService) service.getRoutingService())
- .addHost("stream-0", service.getServiceAddress().getSocketAddress())
- .setAllowRetrySameHost(false);
-
- service.startPlacementPolicy();
-
- WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
- assertEquals(StatusCode.FOUND, response.getHeader().getCode());
- assertEquals(service.getServiceAddress().toString(),
- response.getHeader().getLocation());
-
- // service cache "stream-2"
- StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false);
- // create write ops to stream-2 to make service acquire the stream
- WriteOp op = createWriteOp(service, "stream-2", 0L);
- stream.submit(op);
- stream.start();
- WriteResponse wr = Await.result(op.result());
- assertEquals("Op should succeed",
- StatusCode.SUCCESS, wr.getHeader().getCode());
- assertEquals("Service should acquire stream",
- StreamStatus.INITIALIZED, stream.getStatus());
- assertNotNull(stream.getManager());
- assertNotNull(stream.getWriter());
- assertNull(stream.getLastException());
-
- // the stream is acquired
- response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
- assertEquals(StatusCode.FOUND, response.getHeader().getCode());
- assertEquals(service.getServiceAddress().toString(),
- response.getHeader().getLocation());
- }
-
-}