You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:19 UTC
[12/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
new file mode 100644
index 0000000..58b5b2a
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.client.DistributedLogClientImpl;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.routing.LocalRoutingService;
+import org.apache.distributedlog.client.routing.RegionsRoutingService;
+import org.apache.distributedlog.service.DistributedLogCluster.DLServer;
+import org.apache.distributedlog.service.stream.StreamManager;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * Base test case for distributedlog servers.
+ */
+public abstract class DistributedLogServerTestCase {
+
+ protected static DistributedLogConfiguration conf =
+ new DistributedLogConfiguration().setLockTimeout(10)
+ .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
+ protected static DistributedLogConfiguration noAdHocConf =
+ new DistributedLogConfiguration().setLockTimeout(10).setCreateStreamIfNotExists(false)
+ .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
+ protected static DistributedLogCluster dlCluster;
+ protected static DistributedLogCluster noAdHocCluster;
+
+ /**
+ * A distributedlog client wrapper for testing.
+ */
+ protected static class DLClient {
+ public final LocalRoutingService routingService;
+ public DistributedLogClientBuilder dlClientBuilder;
+ public final DistributedLogClientImpl dlClient;
+
+ protected DLClient(String name,
+ String streamNameRegex,
+ Optional<String> serverSideRoutingFinagleName) {
+ routingService = LocalRoutingService.newBuilder().build();
+ dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+ .name(name)
+ .clientId(ClientId$.MODULE$.apply(name))
+ .routingService(routingService)
+ .streamNameRegex(streamNameRegex)
+ .handshakeWithClientInfo(true)
+ .clientBuilder(ClientBuilder.get()
+ .hostConnectionLimit(1)
+ .connectionTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(60)));
+ if (serverSideRoutingFinagleName.isPresent()) {
+ dlClientBuilder =
+ dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get());
+ }
+ dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+ }
+
+ public void handshake() {
+ dlClient.handshake();
+ }
+
+ public void shutdown() {
+ dlClient.close();
+ }
+ }
+
+ /**
+ * A distributedlog client wrapper that talks to two regions.
+ */
+ protected static class TwoRegionDLClient {
+
+ public final LocalRoutingService localRoutingService;
+ public final LocalRoutingService remoteRoutingService;
+ public final DistributedLogClientBuilder dlClientBuilder;
+ public final DistributedLogClientImpl dlClient;
+
+ protected TwoRegionDLClient(String name, Map<SocketAddress, String> regionMap) {
+ localRoutingService = new LocalRoutingService();
+ remoteRoutingService = new LocalRoutingService();
+ RegionsRoutingService regionsRoutingService =
+ RegionsRoutingService.of(new DefaultRegionResolver(regionMap),
+ localRoutingService, remoteRoutingService);
+ dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+ .name(name)
+ .clientId(ClientId$.MODULE$.apply(name))
+ .routingService(regionsRoutingService)
+ .streamNameRegex(".*")
+ .handshakeWithClientInfo(true)
+ .maxRedirects(2)
+ .clientBuilder(ClientBuilder.get()
+ .hostConnectionLimit(1)
+ .connectionTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(10)));
+ dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+ }
+
+ public void shutdown() {
+ dlClient.close();
+ }
+ }
+
+ private final boolean clientSideRouting;
+ protected DLServer dlServer;
+ protected DLClient dlClient;
+ protected DLServer noAdHocServer;
+ protected DLClient noAdHocClient;
+
+ public static DistributedLogCluster createCluster(DistributedLogConfiguration conf) throws Exception {
+ return DistributedLogCluster.newBuilder()
+ .numBookies(3)
+ .shouldStartZK(true)
+ .zkServers("127.0.0.1")
+ .shouldStartProxy(false)
+ .dlConf(conf)
+ .bkConf(DLMTestUtil.loadTestBkConf())
+ .build();
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ dlCluster = createCluster(conf);
+ dlCluster.start();
+ }
+
+ public void setupNoAdHocCluster() throws Exception {
+ noAdHocCluster = createCluster(noAdHocConf);
+ noAdHocCluster.start();
+ noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false);
+ Optional<String> serverSideRoutingFinagleName = Optional.absent();
+ if (!clientSideRouting) {
+ serverSideRoutingFinagleName =
+ Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress()));
+ }
+ noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName);
+ }
+
+ public void tearDownNoAdHocCluster() throws Exception {
+ if (null != noAdHocClient) {
+ noAdHocClient.shutdown();
+ }
+ if (null != noAdHocServer) {
+ noAdHocServer.shutdown();
+ }
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ if (null != dlCluster) {
+ dlCluster.stop();
+ }
+ if (null != noAdHocCluster) {
+ noAdHocCluster.stop();
+ }
+ }
+
+ protected static URI getUri() {
+ return dlCluster.getUri();
+ }
+
+ protected DistributedLogServerTestCase(boolean clientSideRouting) {
+ this.clientSideRouting = clientSideRouting;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ dlServer = createDistributedLogServer(7001);
+ Optional<String> serverSideRoutingFinagleName = Optional.absent();
+ if (!clientSideRouting) {
+ serverSideRoutingFinagleName =
+ Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress()));
+ }
+ dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (null != dlClient) {
+ dlClient.shutdown();
+ }
+ if (null != dlServer) {
+ dlServer.shutdown();
+ }
+ }
+
+ protected DLServer createDistributedLogServer(int port) throws Exception {
+ return new DLServer(conf, dlCluster.getUri(), port, false);
+ }
+
+ protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port)
+ throws Exception {
+ return new DLServer(conf, dlCluster.getUri(), port, false);
+ }
+
+ protected DLClient createDistributedLogClient(String clientName,
+ Optional<String> serverSideRoutingFinagleName)
+ throws Exception {
+ return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName);
+ }
+
+ protected DLClient createDistributedLogClient(String clientName,
+ String streamNameRegex,
+ Optional<String> serverSideRoutingFinagleName)
+ throws Exception {
+ return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName);
+ }
+
+ protected TwoRegionDLClient createTwoRegionDLClient(String clientName,
+ Map<SocketAddress, String> regionMap)
+ throws Exception {
+ return new TwoRegionDLClient(clientName, regionMap);
+ }
+
+ protected static void checkStreams(int numExpectedStreams, DLServer dlServer) {
+ StreamManager streamManager = dlServer.dlServer.getKey().getStreamManager();
+ assertEquals(numExpectedStreams, streamManager.numCached());
+ assertEquals(numExpectedStreams, streamManager.numAcquired());
+ }
+
+ protected static void checkStreams(Set<String> streams, DLServer dlServer) {
+ StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
+ Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
+ Set<String> acquiredStreams = streamManager.getAcquiredStreams().keySet();
+
+ assertEquals(streams.size(), cachedStreams.size());
+ assertEquals(streams.size(), acquiredStreams.size());
+ assertTrue(Sets.difference(streams, cachedStreams).isEmpty());
+ assertTrue(Sets.difference(streams, acquiredStreams).isEmpty());
+ }
+
+ protected static void checkStream(String name, DLClient dlClient, DLServer dlServer,
+ int expectedNumProxiesInClient, int expectedClientCacheSize,
+ int expectedServerCacheSize, boolean existedInServer, boolean existedInClient) {
+ Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
+ assertEquals(expectedNumProxiesInClient, distribution.size());
+
+ if (expectedNumProxiesInClient > 0) {
+ Map.Entry<SocketAddress, Set<String>> localEntry =
+ distribution.entrySet().iterator().next();
+ assertEquals(dlServer.getAddress(), localEntry.getKey());
+ assertEquals(expectedClientCacheSize, localEntry.getValue().size());
+ assertEquals(existedInClient, localEntry.getValue().contains(name));
+ }
+
+ StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
+ Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
+ Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
+
+ assertEquals(expectedServerCacheSize, cachedStreams.size());
+ assertEquals(existedInServer, cachedStreams.contains(name));
+ assertEquals(expectedServerCacheSize, acquiredStreams.size());
+ assertEquals(existedInServer, acquiredStreams.contains(name));
+ }
+
+ protected static Map<SocketAddress, Set<String>> getStreamOwnershipDistribution(DLClient dlClient) {
+ return dlClient.dlClient.getStreamOwnershipDistribution();
+ }
+
+ protected static Set<String> getAllStreamsFromDistribution(Map<SocketAddress, Set<String>> distribution) {
+ Set<String> allStreams = new HashSet<String>();
+ for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) {
+ allStreams.addAll(entry.getValue());
+ }
+ return allStreams;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
new file mode 100644
index 0000000..4a5dd01
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.TestZooKeeperClientBuilder;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.client.routing.LocalRoutingService;
+import org.apache.distributedlog.exceptions.DLException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.impl.acl.ZKAccessControl;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import org.apache.distributedlog.thrift.AccessControlEntry;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.util.FailpointUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link DistributedLogServer}.
+ */
+public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ protected TestDistributedLogServerBase(boolean clientSideRouting) {
+ super(clientSideRouting);
+ }
+
+ /**
+ * {@link https://issues.apache.org/jira/browse/DL-27}.
+ */
+ @DistributedLogAnnotations.FlakyTest
+ @Ignore
+ @Test(timeout = 60000)
+ public void testBasicWrite() throws Exception {
+ String name = "dlserver-basic-write";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ for (long i = 1; i <= 10; i++) {
+ logger.debug("Write entry {} to stream {}.", i, name);
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())));
+ }
+
+ HeartbeatOptions hbOptions = new HeartbeatOptions();
+ hbOptions.setSendHeartBeatToReader(true);
+ // make sure the first log segment of each stream created
+ FutureUtils.result(dlClient.dlClient.heartbeat(name));
+
+ DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+ LogReader reader = dlm.getInputStream(1);
+ int numRead = 0;
+ LogRecord r = reader.readNext(false);
+ while (null != r) {
+ ++numRead;
+ int i = Integer.parseInt(new String(r.getPayload()));
+ assertEquals(numRead, i);
+ r = reader.readNext(false);
+ }
+ assertEquals(10, numRead);
+ reader.close();
+ dlm.close();
+ }
+
+ /**
+ * Sanity check to make sure both checksum flag values work.
+ */
+ @Test(timeout = 60000)
+ public void testChecksumFlag() throws Exception {
+ String name = "testChecksumFlag";
+ LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
+ routingService.addHost(name, dlServer.getAddress());
+ DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
+ .name(name)
+ .clientId(ClientId$.MODULE$.apply("test"))
+ .routingService(routingService)
+ .handshakeWithClientInfo(true)
+ .clientBuilder(ClientBuilder.get()
+ .hostConnectionLimit(1)
+ .connectionTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(60)))
+ .checksum(false);
+ DistributedLogClient dlClient = dlClientBuilder.build();
+ Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
+ dlClient.close();
+
+ dlClient = dlClientBuilder.checksum(true).build();
+ Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes())));
+ dlClient.close();
+ }
+
+ private void runSimpleBulkWriteTest(int writeCount) throws Exception {
+ String name = String.format("dlserver-bulk-write-%d", writeCount);
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount);
+ for (long i = 1; i <= writeCount; i++) {
+ writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+ }
+
+ logger.debug("Write {} entries to stream {}.", writeCount, name);
+ List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+ assertEquals(futures.size(), writeCount);
+ for (Future<DLSN> future : futures) {
+ // No throw == pass.
+ DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+ }
+
+ DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+ LogReader reader = dlm.getInputStream(1);
+ int numRead = 0;
+ LogRecord r = reader.readNext(false);
+ while (null != r) {
+ int i = Integer.parseInt(new String(r.getPayload()));
+ assertEquals(numRead + 1, i);
+ ++numRead;
+ r = reader.readNext(false);
+ }
+ assertEquals(writeCount, numRead);
+ reader.close();
+ dlm.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWrite() throws Exception {
+ runSimpleBulkWriteTest(100);
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWriteSingleWrite() throws Exception {
+ runSimpleBulkWriteTest(1);
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWriteEmptyList() throws Exception {
+ String name = String.format("dlserver-bulk-write-%d", 0);
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+ List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+
+ assertEquals(0, futures.size());
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWriteNullArg() throws Exception {
+
+ String name = String.format("dlserver-bulk-write-%s", "null");
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+ writes.add(null);
+
+ try {
+ dlClient.dlClient.writeBulk(name, writes);
+ fail("should not have succeeded");
+ } catch (NullPointerException npe) {
+ // expected
+ logger.info("Expected to catch NullPointException.");
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWriteEmptyBuffer() throws Exception {
+ String name = String.format("dlserver-bulk-write-%s", "empty");
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>();
+ writes.add(ByteBuffer.wrap(("").getBytes()));
+ writes.add(ByteBuffer.wrap(("").getBytes()));
+ List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+ assertEquals(2, futures.size());
+ for (Future<DLSN> future : futures) {
+ // No throw == pass
+ DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+ }
+ }
+
+ void failDueToWrongException(Exception ex) {
+ logger.info("testBulkWritePartialFailure: ", ex);
+ fail(String.format("failed with wrong exception %s", ex.getClass().getName()));
+ }
+
+ int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) {
+ int failed = 0;
+ for (int i = start; i < finish; i++) {
+ Future<DLSN> future = futures.get(i);
+ try {
+ Await.result(future, Duration.fromSeconds(10));
+ fail("future should have failed!");
+ } catch (DLException cre) {
+ ++failed;
+ } catch (Exception ex) {
+ failDueToWrongException(ex);
+ }
+ }
+ return failed;
+ }
+
+ void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
+ try {
+ Await.result(future, Duration.fromSeconds(10));
+ fail("should have failed");
+ } catch (DLException dle) {
+ assertEquals(StatusCode.TOO_LARGE_RECORD.getValue(), dle.getCode());
+ } catch (Exception ex) {
+ failDueToWrongException(ex);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWritePartialFailure() throws Exception {
+ String name = String.format("dlserver-bulk-write-%s", "partial-failure");
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ final int writeCount = 100;
+
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1);
+ for (long i = 1; i <= writeCount; i++) {
+ writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+ }
+ // Too big, will cause partial failure.
+ ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
+ writes.add(buf);
+ for (long i = 1; i <= writeCount; i++) {
+ writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+ }
+
+ // Count succeeded.
+ List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+ int succeeded = 0;
+ for (int i = 0; i < writeCount; i++) {
+ Future<DLSN> future = futures.get(i);
+ try {
+ Await.result(future, Duration.fromSeconds(10));
+ ++succeeded;
+ } catch (Exception ex) {
+ failDueToWrongException(ex);
+ }
+ }
+
+ validateFailedAsLogRecordTooLong(futures.get(writeCount));
+ FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
+ assertEquals(writeCount, succeeded);
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception {
+ String name = String.format("dlserver-bulk-write-%s", "first-write-failed");
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ final int writeCount = 100;
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
+ ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
+ writes.add(buf);
+ for (long i = 1; i <= writeCount; i++) {
+ writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+ }
+
+ List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
+ validateFailedAsLogRecordTooLong(futures.get(0));
+ FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
+ }
+
+ @Test(timeout = 60000)
+ public void testBulkWriteTotalFailureLostLock() throws Exception {
+ String name = String.format("dlserver-bulk-write-%s", "lost-lock");
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ final int writeCount = 8;
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1);
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ writes.add(buf);
+ for (long i = 1; i <= writeCount; i++) {
+ writes.add(ByteBuffer.wrap(("" + i).getBytes()));
+ }
+ // Warm it up with a write.
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8)));
+
+ // Failpoint a lost lock, make sure the failure gets promoted to an operation failure.
+ DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft();
+ try {
+ FailpointUtils.setFailpoint(
+ FailpointUtils.FailPointName.FP_WriteInternalLostLock,
+ FailpointUtils.FailPointActions.FailPointAction_Default
+ );
+ Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext());
+ assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code);
+ } finally {
+ FailpointUtils.removeFailpoint(
+ FailpointUtils.FailPointName.FP_WriteInternalLostLock
+ );
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testHeartbeat() throws Exception {
+ String name = "dlserver-heartbeat";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ for (long i = 1; i <= 10; i++) {
+ logger.debug("Send heartbeat {} to stream {}.", i, name);
+ dlClient.dlClient.check(name).get();
+ }
+
+ logger.debug("Write entry one to stream {}.", name);
+ dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get();
+
+ Thread.sleep(1000);
+
+ DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+ LogReader reader = dlm.getInputStream(DLSN.InitialDLSN);
+ int numRead = 0;
+ // eid=0 => control records
+ // other 9 heartbeats will not trigger writing any control records.
+ // eid=1 => user entry
+ long startEntryId = 1;
+ LogRecordWithDLSN r = reader.readNext(false);
+ while (null != r) {
+ int i = Integer.parseInt(new String(r.getPayload()));
+ assertEquals(numRead + 1, i);
+ assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0);
+ ++numRead;
+ ++startEntryId;
+ r = reader.readNext(false);
+ }
+ assertEquals(1, numRead);
+ }
+
+ @Test(timeout = 60000)
+ public void testFenceWrite() throws Exception {
+ String name = "dlserver-fence-write";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ for (long i = 1; i <= 10; i++) {
+ logger.debug("Write entry {} to stream {}.", i, name);
+ dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
+ }
+
+ Thread.sleep(1000);
+
+ logger.info("Fencing stream {}.", name);
+ DLMTestUtil.fenceStream(conf, getUri(), name);
+ logger.info("Fenced stream {}.", name);
+
+ for (long i = 11; i <= 20; i++) {
+ logger.debug("Write entry {} to stream {}.", i, name);
+ dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get();
+ }
+
+ DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
+ LogReader reader = dlm.getInputStream(1);
+ int numRead = 0;
+ LogRecord r = reader.readNext(false);
+ while (null != r) {
+ int i = Integer.parseInt(new String(r.getPayload()));
+ assertEquals(numRead + 1, i);
+ ++numRead;
+ r = reader.readNext(false);
+ }
+ assertEquals(20, numRead);
+ reader.close();
+ dlm.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testDeleteStream() throws Exception {
+ String name = "dlserver-delete-stream";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ long txid = 101;
+ for (long i = 1; i <= 10; i++) {
+ long curTxId = txid++;
+ logger.debug("Write entry {} to stream {}.", curTxId, name);
+ dlClient.dlClient.write(name,
+ ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+ }
+
+ checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+
+ dlClient.dlClient.delete(name).get();
+
+ checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+
+ Thread.sleep(1000);
+
+ DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri());
+ AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN));
+ try {
+ FutureUtils.result(reader101.readNext());
+ fail("Should fail with LogNotFoundException since the stream is deleted");
+ } catch (LogNotFoundException lnfe) {
+ // expected
+ }
+ FutureUtils.result(reader101.asyncClose());
+ dlm101.close();
+
+ txid = 201;
+ for (long i = 1; i <= 10; i++) {
+ long curTxId = txid++;
+ logger.debug("Write entry {} to stream {}.", curTxId, name);
+ DLSN dlsn = dlClient.dlClient.write(name,
+ ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+ }
+ Thread.sleep(1000);
+
+ DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri());
+ LogReader reader201 = dlm201.getInputStream(1);
+ int numRead = 0;
+ int curTxId = 201;
+ LogRecord r = reader201.readNext(false);
+ while (null != r) {
+ int i = Integer.parseInt(new String(r.getPayload()));
+ assertEquals(curTxId++, i);
+ ++numRead;
+ r = reader201.readNext(false);
+ }
+ assertEquals(10, numRead);
+ reader201.close();
+ dlm201.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateStream() throws Exception {
+ try {
+ setupNoAdHocCluster();
+ final String name = "dlserver-create-stream";
+
+ noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress());
+ assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+ assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+
+ long txid = 101;
+ for (long i = 1; i <= 10; i++) {
+ long curTxId = txid++;
+ logger.debug("Write entry {} to stream {}.", curTxId, name);
+ noAdHocClient.dlClient.write(name,
+ ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+ }
+
+ assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+ } finally {
+ tearDownNoAdHocCluster();
+ }
+ }
+
+ /**
+ * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing.
+ */
+ @Test(timeout = 60000)
+ public void testCreateStreamTwice() throws Exception {
+ try {
+ setupNoAdHocCluster();
+ final String name = "dlserver-create-stream-twice";
+
+ noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress());
+ assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+ assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+
+ long txid = 101;
+ for (long i = 1; i <= 10; i++) {
+ long curTxId = txid++;
+ logger.debug("Write entry {} to stream {}.", curTxId, name);
+ noAdHocClient.dlClient.write(name,
+ ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+ }
+
+ assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+
+ // create again
+ assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn());
+ assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name));
+ } finally {
+ tearDownNoAdHocCluster();
+ }
+ }
+
+
+
+ @Test(timeout = 60000)
+ public void testTruncateStream() throws Exception {
+ String name = "dlserver-truncate-stream";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ long txid = 1;
+ Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>();
+ for (int s = 1; s <= 2; s++) {
+ for (long i = 1; i <= 10; i++) {
+ long curTxId = txid++;
+ logger.debug("Write entry {} to stream {}.", curTxId, name);
+ DLSN dlsn = dlClient.dlClient.write(name,
+ ByteBuffer.wrap(("" + curTxId).getBytes())).get();
+ txid2DLSN.put(curTxId, dlsn);
+ }
+ if (s == 1) {
+ dlClient.dlClient.release(name).get();
+ }
+ }
+
+ DLSN dlsnToDelete = txid2DLSN.get(11L);
+ dlClient.dlClient.truncate(name, dlsnToDelete).get();
+
+ DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri());
+ LogReader reader = readDLM.getInputStream(1);
+ int numRead = 0;
+ int curTxId = 11;
+ LogRecord r = reader.readNext(false);
+ while (null != r) {
+ int i = Integer.parseInt(new String(r.getPayload()));
+ assertEquals(curTxId++, i);
+ ++numRead;
+ r = reader.readNext(false);
+ }
+ assertEquals(10, numRead);
+ reader.close();
+ readDLM.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testRequestDenied() throws Exception {
+ String name = "request-denied";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ AccessControlEntry ace = new AccessControlEntry();
+ ace.setDenyWrite(true);
+ ZooKeeperClient zkc = TestZooKeeperClientBuilder
+ .newBuilder()
+ .uri(getUri())
+ .connectionTimeoutMs(60000)
+ .sessionTimeoutMs(60000)
+ .build();
+ DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
+ BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
+ String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
+ ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
+ accessControl.create(zkc);
+
+ AccessControlManager acm = dlNamespace.createAccessControlManager();
+ while (acm.allowWrite(name)) {
+ Thread.sleep(100);
+ }
+
+ try {
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+ fail("Should fail with request denied exception");
+ } catch (DLException dle) {
+ assertEquals(StatusCode.REQUEST_DENIED.getValue(), dle.getCode());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testNoneStreamNameRegex() throws Exception {
+ String streamNamePrefix = "none-stream-name-regex-";
+ int numStreams = 5;
+ Set<String> streams = new HashSet<String>();
+
+ for (int i = 0; i < numStreams; i++) {
+ streams.add(streamNamePrefix + i);
+ }
+ testStreamNameRegex(streams, ".*", streams);
+ }
+
+ @Test(timeout = 60000)
+ public void testStreamNameRegex() throws Exception {
+ String streamNamePrefix = "stream-name-regex-";
+ int numStreams = 5;
+ Set<String> streams = new HashSet<String>();
+ Set<String> expectedStreams = new HashSet<String>();
+ String streamNameRegex = streamNamePrefix + "1";
+
+ for (int i = 0; i < numStreams; i++) {
+ streams.add(streamNamePrefix + i);
+ }
+ expectedStreams.add(streamNamePrefix + "1");
+
+ testStreamNameRegex(streams, streamNameRegex, expectedStreams);
+ }
+
+ private void testStreamNameRegex(Set<String> streams, String streamNameRegex,
+ Set<String> expectedStreams)
+ throws Exception {
+ for (String streamName : streams) {
+ dlClient.routingService.addHost(streamName, dlServer.getAddress());
+ Await.result(dlClient.dlClient.write(streamName,
+ ByteBuffer.wrap(streamName.getBytes(UTF_8))));
+ }
+
+ DLClient client = createDistributedLogClient(
+ "test-stream-name-regex",
+ streamNameRegex,
+ Optional.<String>absent());
+ try {
+ client.routingService.addHost("unknown", dlServer.getAddress());
+ client.handshake();
+ Map<SocketAddress, Set<String>> distribution =
+ client.dlClient.getStreamOwnershipDistribution();
+ assertEquals(1, distribution.size());
+ Set<String> cachedStreams = distribution.values().iterator().next();
+ assertNotNull(cachedStreams);
+ assertEquals(expectedStreams.size(), cachedStreams.size());
+
+ for (String streamName : cachedStreams) {
+ assertTrue(expectedStreams.contains(streamName));
+ }
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testReleaseStream() throws Exception {
+ String name = "dlserver-release-stream";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+ checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+
+ // release the stream
+ Await.result(dlClient.dlClient.release(name));
+ checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+ }
+
+ protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize,
+ String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) {
+ Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution();
+ assertEquals(expectedNumProxiesInClient, distribution.size());
+
+ if (expectedNumProxiesInClient > 0) {
+ Map.Entry<SocketAddress, Set<String>> localEntry =
+ distribution.entrySet().iterator().next();
+ assertEquals(owner, localEntry.getKey());
+ assertEquals(expectedClientCacheSize, localEntry.getValue().size());
+ assertEquals(existedInClient, localEntry.getValue().contains(name));
+ }
+
+
+ StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager();
+ Set<String> cachedStreams = streamManager.getCachedStreams().keySet();
+ Set<String> acquiredStreams = streamManager.getCachedStreams().keySet();
+
+ assertEquals(expectedServerCacheSize, cachedStreams.size());
+ assertEquals(existedInServer, cachedStreams.contains(name));
+ assertEquals(expectedServerCacheSize, acquiredStreams.size());
+ assertEquals(existedInServer, acquiredStreams.contains(name));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
new file mode 100644
index 0000000..c7ae960
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerClientRouting.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.fail;
+
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import org.junit.Test;
+
+/**
+ * Test the server with client side routing.
+ */
+public class TestDistributedLogServerClientRouting extends TestDistributedLogServerBase {
+
+ public TestDistributedLogServerClientRouting() {
+ super(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testAcceptNewStream() throws Exception {
+ String name = "dlserver-accept-new-stream";
+
+ dlClient.routingService.addHost(name, dlServer.getAddress());
+ dlClient.routingService.setAllowRetrySameHost(false);
+
+ Await.result(dlClient.dlClient.setAcceptNewStream(false));
+
+ try {
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+ fail("Should fail because the proxy couldn't accept new stream");
+ } catch (NoBrokersAvailableException nbae) {
+ // expected
+ }
+ checkStream(0, 0, 0, name, dlServer.getAddress(), false, false);
+
+ Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true));
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8))));
+ checkStream(1, 1, 1, name, dlServer.getAddress(), true, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
new file mode 100644
index 0000000..12416a3
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerServerRouting.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+/**
+ * Test the server with client side routing.
+ */
+public class TestDistributedLogServerServerRouting extends TestDistributedLogServerBase {
+
+ public TestDistributedLogServerServerRouting() {
+ super(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
new file mode 100644
index 0000000..4a2d65f
--- /dev/null
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
@@ -0,0 +1,833 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.acl.DefaultAccessControlManager;
+import org.apache.distributedlog.client.routing.LocalRoutingService;
+import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.config.NullStreamConfigProvider;
+import org.apache.distributedlog.service.config.ServerConfiguration;
+import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
+import org.apache.distributedlog.service.stream.Stream;
+import org.apache.distributedlog.service.stream.StreamImpl;
+import org.apache.distributedlog.service.stream.StreamImpl.StreamStatus;
+import org.apache.distributedlog.service.stream.StreamManagerImpl;
+import org.apache.distributedlog.service.stream.WriteOp;
+import org.apache.distributedlog.service.streamset.DelimiterStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
+import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.feature.SettableFeature;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for DistributedLog Service.
+ */
+public class TestDistributedLogService extends TestDistributedLogBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private ServerConfiguration serverConf;
+ private DistributedLogConfiguration dlConf;
+ private URI uri;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private DistributedLogServiceImpl service;
+
+ @Before
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ dlConf = new DistributedLogConfiguration();
+ dlConf.addConfiguration(conf);
+ dlConf.setLockTimeout(0)
+ .setOutputBufferSize(0)
+ .setPeriodicFlushFrequencyMilliSeconds(10)
+ .setSchedulerShutdownTimeoutMs(100);
+ serverConf = newLocalServerConf();
+ uri = createDLMURI("/" + testName.getMethodName());
+ ensureURICreated(uri);
+ service = createService(serverConf, dlConf, latch);
+ }
+
+ @After
+ @Override
+ public void teardown() throws Exception {
+ if (null != service) {
+ service.shutdown();
+ }
+ super.teardown();
+ }
+
+ private DistributedLogConfiguration newLocalConf() {
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(dlConf);
+ return confLocal;
+ }
+
+ private ServerConfiguration newLocalServerConf() {
+ ServerConfiguration serverConf = new ServerConfiguration();
+ serverConf.loadConf(dlConf);
+ serverConf.setServerThreads(1);
+ return serverConf;
+ }
+
+ private DistributedLogServiceImpl createService(
+ ServerConfiguration serverConf,
+ DistributedLogConfiguration dlConf) throws Exception {
+ return createService(serverConf, dlConf, new CountDownLatch(1));
+ }
+
+ private DistributedLogServiceImpl createService(
+ ServerConfiguration serverConf,
+ DistributedLogConfiguration dlConf,
+ CountDownLatch latch) throws Exception {
+ // Build the stream partition converter
+ StreamPartitionConverter converter;
+ try {
+ converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
+ } catch (ConfigurationException e) {
+ logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
+ IdentityStreamPartitionConverter.class.getName());
+ converter = new IdentityStreamPartitionConverter();
+ }
+ return new DistributedLogServiceImpl(
+ serverConf,
+ dlConf,
+ ConfUtils.getConstDynConf(dlConf),
+ new NullStreamConfigProvider(),
+ uri,
+ converter,
+ new LocalRoutingService(),
+ NullStatsLogger.INSTANCE,
+ NullStatsLogger.INSTANCE,
+ latch,
+ new EqualLoadAppraiser());
+ }
+
+ private StreamImpl createUnstartedStream(DistributedLogServiceImpl service,
+ String name) throws Exception {
+ StreamImpl stream = (StreamImpl) service.newStream(name);
+ stream.initialize();
+ return stream;
+ }
+
+ private ByteBuffer createRecord(long txid) {
+ return ByteBuffer.wrap(("record-" + txid).getBytes(UTF_8));
+ }
+
+ private WriteOp createWriteOp(DistributedLogServiceImpl service,
+ String streamName,
+ long txid) {
+ ByteBuffer data = createRecord(txid);
+ return service.newWriteOp(streamName, data, null);
+ }
+
+ @Test(timeout = 60000)
+ public void testAcquireStreams() throws Exception {
+ String streamName = testName.getMethodName();
+ StreamImpl s0 = createUnstartedStream(service, streamName);
+ ServerConfiguration serverConf1 = new ServerConfiguration();
+ serverConf1.addConfiguration(serverConf);
+ serverConf1.setServerPort(9999);
+ DistributedLogServiceImpl service1 = createService(serverConf1, dlConf);
+ StreamImpl s1 = createUnstartedStream(service1, streamName);
+
+ // create write ops
+ WriteOp op0 = createWriteOp(service, streamName, 0L);
+ s0.submit(op0);
+
+ WriteOp op1 = createWriteOp(service1, streamName, 1L);
+ s1.submit(op1);
+
+ // check pending size
+ assertEquals("Write Op 0 should be pending in service 0",
+ 1, s0.numPendingOps());
+ assertEquals("Write Op 1 should be pending in service 1",
+ 1, s1.numPendingOps());
+
+ // start acquiring s0
+ s0.start();
+ WriteResponse wr0 = Await.result(op0.result());
+ assertEquals("Op 0 should succeed",
+ StatusCode.SUCCESS, wr0.getHeader().getCode());
+ assertEquals("Service 0 should acquire stream",
+ StreamStatus.INITIALIZED, s0.getStatus());
+ assertNotNull(s0.getManager());
+ assertNotNull(s0.getWriter());
+ assertNull(s0.getLastException());
+
+ // start acquiring s1
+ s1.start();
+ WriteResponse wr1 = Await.result(op1.result());
+ assertEquals("Op 1 should fail",
+ StatusCode.FOUND, wr1.getHeader().getCode());
+ // the stream will be set to ERROR and then be closed.
+ assertTrue("Service 1 should be in unavailable state",
+ StreamStatus.isUnavailable(s1.getStatus()));
+ assertNotNull(s1.getManager());
+ assertNull(s1.getWriter());
+ assertNotNull(s1.getLastException());
+ assertTrue(s1.getLastException() instanceof OwnershipAcquireFailedException);
+
+ service1.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testAcquireStreamsWhenExceedMaxCachedPartitions() throws Exception {
+ String streamName = testName.getMethodName() + "_0000";
+
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(dlConf);
+ confLocal.setMaxCachedPartitionsPerProxy(1);
+
+ ServerConfiguration serverConfLocal = new ServerConfiguration();
+ serverConfLocal.addConfiguration(serverConf);
+ serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
+
+ DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
+ Stream stream = serviceLocal.getLogWriter(streamName);
+
+ // stream is cached
+ assertNotNull(stream);
+ assertEquals(1, serviceLocal.getStreamManager().numCached());
+
+ // create write ops
+ WriteOp op0 = createWriteOp(service, streamName, 0L);
+ stream.submit(op0);
+ WriteResponse wr0 = Await.result(op0.result());
+ assertEquals("Op 0 should succeed",
+ StatusCode.SUCCESS, wr0.getHeader().getCode());
+ assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+
+ // should fail to acquire another partition
+ try {
+ serviceLocal.getLogWriter(testName.getMethodName() + "_0001");
+ fail("Should fail to acquire new streams");
+ } catch (StreamUnavailableException sue) {
+ // expected
+ }
+ assertEquals(1, serviceLocal.getStreamManager().numCached());
+ assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+
+ // should be able to acquire partitions from other streams
+ String anotherStreamName = testName.getMethodName() + "-another_0001";
+ Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
+ assertNotNull(anotherStream);
+ assertEquals(2, serviceLocal.getStreamManager().numCached());
+
+ // create write ops
+ WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
+ anotherStream.submit(op1);
+ WriteResponse wr1 = Await.result(op1.result());
+ assertEquals("Op 1 should succeed",
+ StatusCode.SUCCESS, wr1.getHeader().getCode());
+ assertEquals(2, serviceLocal.getStreamManager().numAcquired());
+ }
+
+ @Test(timeout = 60000)
+ public void testAcquireStreamsWhenExceedMaxAcquiredPartitions() throws Exception {
+ String streamName = testName.getMethodName() + "_0000";
+
+ DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+ confLocal.addConfiguration(dlConf);
+ confLocal.setMaxCachedPartitionsPerProxy(-1);
+ confLocal.setMaxAcquiredPartitionsPerProxy(1);
+
+ ServerConfiguration serverConfLocal = new ServerConfiguration();
+ serverConfLocal.addConfiguration(serverConf);
+ serverConfLocal.setStreamPartitionConverterClass(DelimiterStreamPartitionConverter.class);
+
+ DistributedLogServiceImpl serviceLocal = createService(serverConfLocal, confLocal);
+ Stream stream = serviceLocal.getLogWriter(streamName);
+
+ // stream is cached
+ assertNotNull(stream);
+ assertEquals(1, serviceLocal.getStreamManager().numCached());
+
+ // create write ops
+ WriteOp op0 = createWriteOp(service, streamName, 0L);
+ stream.submit(op0);
+ WriteResponse wr0 = Await.result(op0.result());
+ assertEquals("Op 0 should succeed",
+ StatusCode.SUCCESS, wr0.getHeader().getCode());
+ assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+
+ // should be able to cache partitions from same stream
+ String anotherStreamName = testName.getMethodName() + "_0001";
+ Stream anotherStream = serviceLocal.getLogWriter(anotherStreamName);
+ assertNotNull(anotherStream);
+ assertEquals(2, serviceLocal.getStreamManager().numCached());
+
+ // create write ops
+ WriteOp op1 = createWriteOp(service, anotherStreamName, 0L);
+ anotherStream.submit(op1);
+ WriteResponse wr1 = Await.result(op1.result());
+ assertEquals("Op 1 should fail",
+ StatusCode.STREAM_UNAVAILABLE, wr1.getHeader().getCode());
+ assertEquals(1, serviceLocal.getStreamManager().numAcquired());
+ }
+
+ @Test(timeout = 60000)
+ public void testCloseShouldErrorOutPendingOps() throws Exception {
+ String streamName = testName.getMethodName();
+ StreamImpl s = createUnstartedStream(service, streamName);
+
+ int numWrites = 10;
+ List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
+ for (int i = 0; i < numWrites; i++) {
+ WriteOp op = createWriteOp(service, streamName, i);
+ s.submit(op);
+ futureList.add(op.result());
+ }
+ assertEquals(numWrites, s.numPendingOps());
+ Await.result(s.requestClose("close stream"));
+ assertEquals("Stream " + streamName + " is set to " + StreamStatus.CLOSED,
+ StreamStatus.CLOSED, s.getStatus());
+ for (int i = 0; i < numWrites; i++) {
+ Future<WriteResponse> future = futureList.get(i);
+ WriteResponse wr = Await.result(future);
+ assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
+ StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testCloseTwice() throws Exception {
+ String streamName = testName.getMethodName();
+ StreamImpl s = createUnstartedStream(service, streamName);
+
+ int numWrites = 10;
+ List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
+ for (int i = 0; i < numWrites; i++) {
+ WriteOp op = createWriteOp(service, streamName, i);
+ s.submit(op);
+ futureList.add(op.result());
+ }
+ assertEquals(numWrites, s.numPendingOps());
+
+ Future<Void> closeFuture0 = s.requestClose("close 0");
+ assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
+ StreamStatus.CLOSING == s.getStatus()
+ || StreamStatus.CLOSED == s.getStatus());
+ Future<Void> closeFuture1 = s.requestClose("close 1");
+ assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
+ StreamStatus.CLOSING == s.getStatus()
+ || StreamStatus.CLOSED == s.getStatus());
+
+ Await.result(closeFuture0);
+ assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
+ StreamStatus.CLOSED, s.getStatus());
+ Await.result(closeFuture1);
+ assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
+ StreamStatus.CLOSED, s.getStatus());
+
+ for (int i = 0; i < numWrites; i++) {
+ Future<WriteResponse> future = futureList.get(i);
+ WriteResponse wr = Await.result(future);
+ assertEquals("Pending op should fail with " + StatusCode.STREAM_UNAVAILABLE,
+ StatusCode.STREAM_UNAVAILABLE, wr.getHeader().getCode());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testFailRequestsDuringClosing() throws Exception {
+ String streamName = testName.getMethodName();
+ StreamImpl s = createUnstartedStream(service, streamName);
+
+ Future<Void> closeFuture = s.requestClose("close");
+ assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
+ StreamStatus.CLOSING == s.getStatus()
+ || StreamStatus.CLOSED == s.getStatus());
+ WriteOp op1 = createWriteOp(service, streamName, 0L);
+ s.submit(op1);
+ WriteResponse response1 = Await.result(op1.result());
+ assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closing",
+ StatusCode.STREAM_UNAVAILABLE, response1.getHeader().getCode());
+
+ Await.result(closeFuture);
+ assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
+ StreamStatus.CLOSED, s.getStatus());
+ WriteOp op2 = createWriteOp(service, streamName, 1L);
+ s.submit(op2);
+ WriteResponse response2 = Await.result(op2.result());
+ assertEquals("Op should fail with " + StatusCode.STREAM_UNAVAILABLE + " if it is closed",
+ StatusCode.STREAM_UNAVAILABLE, response2.getHeader().getCode());
+ }
+
+ @Test(timeout = 60000)
+ public void testServiceTimeout() throws Exception {
+ DistributedLogConfiguration confLocal = newLocalConf();
+ confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+ .setImmediateFlushEnabled(false)
+ .setPeriodicFlushFrequencyMilliSeconds(0);
+ ServerConfiguration serverConfLocal = newLocalServerConf();
+ serverConfLocal.addConfiguration(serverConf);
+ serverConfLocal.setServiceTimeoutMs(200)
+ .setStreamProbationTimeoutMs(100);
+ String streamName = testName.getMethodName();
+ // create a new service with 200ms timeout
+ DistributedLogServiceImpl localService = createService(serverConfLocal, confLocal);
+ StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+ int numWrites = 10;
+ List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>(numWrites);
+ for (int i = 0; i < numWrites; i++) {
+ futureList.add(localService.write(streamName, createRecord(i)));
+ }
+
+ assertTrue("Stream " + streamName + " should be cached",
+ streamManager.getCachedStreams().containsKey(streamName));
+
+ StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName);
+ // the stream should be set CLOSING
+ while (StreamStatus.CLOSING != s.getStatus()
+ && StreamStatus.CLOSED != s.getStatus()) {
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+ assertNotNull("Writer should be initialized", s.getWriter());
+ assertNull("No exception should be thrown", s.getLastException());
+ Future<Void> closeFuture = s.getCloseFuture();
+ Await.result(closeFuture);
+ for (int i = 0; i < numWrites; i++) {
+ assertTrue("Write should not fail before closing",
+ futureList.get(i).isDefined());
+ WriteResponse response = Await.result(futureList.get(i));
+ assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
+ StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+ || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+ || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+ }
+
+ while (streamManager.getCachedStreams().containsKey(streamName)) {
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+
+ assertFalse("Stream should be removed from cache",
+ streamManager.getCachedStreams().containsKey(streamName));
+ assertFalse("Stream should be removed from acquired cache",
+ streamManager.getAcquiredStreams().containsKey(streamName));
+
+ localService.shutdown();
+ }
+
+ private DistributedLogServiceImpl createConfiguredLocalService() throws Exception {
+ DistributedLogConfiguration confLocal = newLocalConf();
+ confLocal.setOutputBufferSize(0)
+ .setImmediateFlushEnabled(true)
+ .setPeriodicFlushFrequencyMilliSeconds(0);
+ return createService(serverConf, confLocal);
+ }
+
+ private ByteBuffer getTestDataBuffer() {
+ return ByteBuffer.wrap("test-data".getBytes());
+ }
+
+ @Test(timeout = 60000)
+ public void testNonDurableWrite() throws Exception {
+ DistributedLogConfiguration confLocal = newLocalConf();
+ confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+ .setImmediateFlushEnabled(false)
+ .setPeriodicFlushFrequencyMilliSeconds(0)
+ .setDurableWriteEnabled(false);
+ ServerConfiguration serverConfLocal = new ServerConfiguration();
+ serverConfLocal.addConfiguration(serverConf);
+ serverConfLocal.enableDurableWrite(false);
+ serverConfLocal.setServiceTimeoutMs(Integer.MAX_VALUE)
+ .setStreamProbationTimeoutMs(Integer.MAX_VALUE);
+ String streamName = testName.getMethodName();
+ DistributedLogServiceImpl localService =
+ createService(serverConfLocal, confLocal);
+ StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+ int numWrites = 10;
+ List<Future<WriteResponse>> futureList = new ArrayList<Future<WriteResponse>>();
+ for (int i = 0; i < numWrites; i++) {
+ futureList.add(localService.write(streamName, createRecord(i)));
+ }
+ assertTrue("Stream " + streamName + " should be cached",
+ streamManager.getCachedStreams().containsKey(streamName));
+ List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList));
+ for (WriteResponse wr : resultList) {
+ assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn()));
+ }
+
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testWriteOpNoChecksum() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ WriteContext ctx = new WriteContext();
+ Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testTruncateOpNoChecksum() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ WriteContext ctx = new WriteContext();
+ Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testStreamOpNoChecksum() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ WriteContext ctx = new WriteContext();
+ HeartbeatOptions option = new HeartbeatOptions();
+ option.setSendHeartBeatToReader(true);
+
+ // hearbeat to acquire the stream and then release the stream
+ Future<WriteResponse> result = localService.heartbeatWithOptions("test", ctx, option);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+ result = localService.release("test", ctx);
+ resp = Await.result(result);
+ assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+
+ // heartbeat to acquire the stream and then delete the stream
+ result = localService.heartbeatWithOptions("test", ctx, option);
+ resp = Await.result(result);
+ assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+ result = localService.delete("test", ctx);
+ resp = Await.result(result);
+ assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
+
+ // shutdown the local service
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testWriteOpChecksumBadChecksum() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ WriteContext ctx = new WriteContext().setCrc32(999);
+ Future<WriteResponse> result = localService.writeWithContext("test", getTestDataBuffer(), ctx);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testWriteOpChecksumBadStream() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ WriteContext ctx = new WriteContext().setCrc32(
+ ProtocolUtils.writeOpCRC32("test", getTestDataBuffer().array()));
+ Future<WriteResponse> result = localService.writeWithContext("test1", getTestDataBuffer(), ctx);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testWriteOpChecksumBadData() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ ByteBuffer buffer = getTestDataBuffer();
+ WriteContext ctx = new WriteContext().setCrc32(
+ ProtocolUtils.writeOpCRC32("test", buffer.array()));
+
+ // Overwrite 1 byte to corrupt data.
+ buffer.put(1, (byte) 0xab);
+ Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testStreamOpChecksumBadChecksum() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ WriteContext ctx = new WriteContext().setCrc32(999);
+ Future<WriteResponse> result = localService.heartbeat("test", ctx);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+ result = localService.release("test", ctx);
+ resp = Await.result(result);
+ assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+ result = localService.delete("test", ctx);
+ resp = Await.result(result);
+ assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testTruncateOpChecksumBadChecksum() throws Exception {
+ DistributedLogServiceImpl localService = createConfiguredLocalService();
+ WriteContext ctx = new WriteContext().setCrc32(999);
+ Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
+ WriteResponse resp = Await.result(result);
+ assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
+ localService.shutdown();
+ }
+
+ private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long checksum) {
+ return new WriteOp(name,
+ ByteBuffer.wrap("test".getBytes()),
+ new NullStatsLogger(),
+ new NullStatsLogger(),
+ new IdentityStreamPartitionConverter(),
+ new ServerConfiguration(),
+ (byte) 0,
+ checksum,
+ false,
+ disabledFeature,
+ DefaultAccessControlManager.INSTANCE);
+ }
+
+ @Test(timeout = 60000)
+ public void testStreamOpBadChecksumWithChecksumDisabled() throws Exception {
+ String streamName = testName.getMethodName();
+
+ SettableFeature disabledFeature = new SettableFeature("", 0);
+
+ WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, 919191L);
+ WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, 919191L);
+
+ try {
+ writeOp0.preExecute();
+ fail("should have thrown");
+ } catch (Exception ex) {
+ }
+
+ disabledFeature.set(1);
+ writeOp1.preExecute();
+ }
+
+ @Test(timeout = 60000)
+ public void testStreamOpGoodChecksumWithChecksumDisabled() throws Exception {
+ String streamName = testName.getMethodName();
+
+ SettableFeature disabledFeature = new SettableFeature("", 1);
+ WriteOp writeOp0 = getWriteOp(
+ streamName,
+ disabledFeature,
+ ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+ WriteOp writeOp1 = getWriteOp(
+ streamName,
+ disabledFeature,
+ ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+
+ writeOp0.preExecute();
+ disabledFeature.set(0);
+ writeOp1.preExecute();
+ }
+
+ @Test(timeout = 60000)
+ public void testCloseStreamsShouldFlush() throws Exception {
+ DistributedLogConfiguration confLocal = newLocalConf();
+ confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+ .setImmediateFlushEnabled(false)
+ .setPeriodicFlushFrequencyMilliSeconds(0);
+
+ String streamNamePrefix = testName.getMethodName();
+ DistributedLogServiceImpl localService = createService(serverConf, confLocal);
+ StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+ int numStreams = 10;
+ int numWrites = 10;
+ List<Future<WriteResponse>> futureList =
+ Lists.newArrayListWithExpectedSize(numStreams * numWrites);
+ for (int i = 0; i < numStreams; i++) {
+ String streamName = streamNamePrefix + "-" + i;
+ HeartbeatOptions hbOptions = new HeartbeatOptions();
+ hbOptions.setSendHeartBeatToReader(true);
+ // make sure the first log segment of each stream created
+ FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+ for (int j = 0; j < numWrites; j++) {
+ futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
+ }
+ }
+
+ assertEquals("There should be " + numStreams + " streams in cache",
+ numStreams, streamManager.getCachedStreams().size());
+ while (streamManager.getAcquiredStreams().size() < numStreams) {
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+
+ Future<List<Void>> closeResult = localService.closeStreams();
+ List<Void> closedStreams = Await.result(closeResult);
+ assertEquals("There should be " + numStreams + " streams closed",
+ numStreams, closedStreams.size());
+ // all writes should be flushed
+ for (Future<WriteResponse> future : futureList) {
+ WriteResponse response = Await.result(future);
+ assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(),
+ StatusCode.SUCCESS == response.getHeader().getCode()
+ || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+ || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
+ }
+ assertTrue("There should be no streams in the cache",
+ streamManager.getCachedStreams().isEmpty());
+ assertTrue("There should be no streams in the acquired cache",
+ streamManager.getAcquiredStreams().isEmpty());
+
+ localService.shutdown();
+ }
+
+ @Test(timeout = 60000)
+ public void testCloseStreamsShouldAbort() throws Exception {
+ DistributedLogConfiguration confLocal = newLocalConf();
+ confLocal.setOutputBufferSize(Integer.MAX_VALUE)
+ .setImmediateFlushEnabled(false)
+ .setPeriodicFlushFrequencyMilliSeconds(0);
+
+ String streamNamePrefix = testName.getMethodName();
+ DistributedLogServiceImpl localService = createService(serverConf, confLocal);
+ StreamManagerImpl streamManager = (StreamManagerImpl) localService.getStreamManager();
+
+ int numStreams = 10;
+ int numWrites = 10;
+ List<Future<WriteResponse>> futureList =
+ Lists.newArrayListWithExpectedSize(numStreams * numWrites);
+ for (int i = 0; i < numStreams; i++) {
+ String streamName = streamNamePrefix + "-" + i;
+ HeartbeatOptions hbOptions = new HeartbeatOptions();
+ hbOptions.setSendHeartBeatToReader(true);
+ // make sure the first log segment of each stream created
+ FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+ for (int j = 0; j < numWrites; j++) {
+ futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
+ }
+ }
+
+ assertEquals("There should be " + numStreams + " streams in cache",
+ numStreams, streamManager.getCachedStreams().size());
+ while (streamManager.getAcquiredStreams().size() < numStreams) {
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+
+ for (Stream s : streamManager.getAcquiredStreams().values()) {
+ StreamImpl stream = (StreamImpl) s;
+ stream.setStatus(StreamStatus.ERROR);
+ }
+
+ Future<List<Void>> closeResult = localService.closeStreams();
+ List<Void> closedStreams = Await.result(closeResult);
+ assertEquals("There should be " + numStreams + " streams closed",
+ numStreams, closedStreams.size());
+ // all writes should be flushed
+ for (Future<WriteResponse> future : futureList) {
+ WriteResponse response = Await.result(future);
+ assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : "
+ + response.getHeader().getCode(),
+ StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+ || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+ || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+ }
+ // acquired streams should all been removed after we close them
+ assertTrue("There should be no streams in the acquired cache",
+ streamManager.getAcquiredStreams().isEmpty());
+ localService.shutdown();
+ // cached streams wouldn't be removed immediately after streams are closed
+ // but they should be removed after we shutdown the service
+ assertTrue("There should be no streams in the cache after shutting down the service",
+ streamManager.getCachedStreams().isEmpty());
+ }
+
+ @Test(timeout = 60000)
+ public void testShutdown() throws Exception {
+ service.shutdown();
+ StreamManagerImpl streamManager = (StreamManagerImpl) service.getStreamManager();
+ WriteResponse response =
+ Await.result(service.write(testName.getMethodName(), createRecord(0L)));
+ assertEquals("Write should fail with " + StatusCode.SERVICE_UNAVAILABLE,
+ StatusCode.SERVICE_UNAVAILABLE, response.getHeader().getCode());
+ assertTrue("There should be no streams created after shutdown",
+ streamManager.getCachedStreams().isEmpty());
+ assertTrue("There should be no streams acquired after shutdown",
+ streamManager.getAcquiredStreams().isEmpty());
+ }
+
+ @Test(timeout = 60000)
+ public void testGetOwner() throws Exception {
+ ((LocalRoutingService) service.getRoutingService())
+ .addHost("stream-0", service.getServiceAddress().getSocketAddress())
+ .setAllowRetrySameHost(false);
+
+ service.startPlacementPolicy();
+
+ WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
+ assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+ assertEquals(service.getServiceAddress().toString(),
+ response.getHeader().getLocation());
+
+ // service cache "stream-2"
+ StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false);
+ // create write ops to stream-2 to make service acquire the stream
+ WriteOp op = createWriteOp(service, "stream-2", 0L);
+ stream.submit(op);
+ stream.start();
+ WriteResponse wr = Await.result(op.result());
+ assertEquals("Op should succeed",
+ StatusCode.SUCCESS, wr.getHeader().getCode());
+ assertEquals("Service should acquire stream",
+ StreamStatus.INITIALIZED, stream.getStatus());
+ assertNotNull(stream.getManager());
+ assertNotNull(stream.getWriter());
+ assertNull(stream.getLastException());
+
+ // the stream is acquired
+ response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
+ assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+ assertEquals(service.getServiceAddress().toString(),
+ response.getHeader().getLocation());
+ }
+
+}