You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:43 UTC
[38/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
new file mode 100644
index 0000000..d2d61a9
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addresses;
+import com.twitter.finagle.addr.WeightedAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link RoutingService}.
+ */
+@RunWith(Parameterized.class)
+public class TestRoutingService {
+
+ static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class);
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> configs() {
+ ArrayList<Object[]> list = new ArrayList<Object[]>();
+ for (int i = 0; i <= 1; i++) {
+ for (int j = 0; j <= 1; j++) {
+ for (int k = 0; k <= 1; k++) {
+ list.add(new Boolean[] {i == 1, j == 1, k == 1});
+ }
+ }
+ }
+ return list;
+ }
+
+ private final boolean consistentHash;
+ private final boolean weightedAddresses;
+ private final boolean asyncResolution;
+
+ public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) {
+ this.consistentHash = consistentHash;
+ this.weightedAddresses = weightedAddresses;
+ this.asyncResolution = asyncResolution;
+ }
+
+ private List<Address> getAddresses(boolean weightedAddresses) {
+ ArrayList<Address> addresses = new ArrayList<Address>();
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181)));
+
+ if (weightedAddresses) {
+ ArrayList<Address> wAddresses = new ArrayList<Address>();
+ for (Address address: addresses) {
+ wAddresses.add(WeightedAddress.apply(address, 1.0));
+ }
+ return wAddresses;
+ } else {
+ return addresses;
+ }
+ }
+
+ private void testRoutingServiceHelper(boolean consistentHash,
+ boolean weightedAddresses,
+ boolean asyncResolution)
+ throws Exception {
+ ExecutorService executorService = null;
+ final List<Address> addresses = getAddresses(weightedAddresses);
+ final TestName name = new TestName();
+ RoutingService routingService;
+ if (consistentHash) {
+ routingService = ConsistentHashRoutingService.newBuilder()
+ .serverSet(new NameServerSet(name))
+ .resolveFromName(true)
+ .numReplicas(997)
+ .build();
+ } else {
+ routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder()
+ .serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build();
+ }
+
+ if (asyncResolution) {
+ executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ name.changeAddrs(addresses);
+ }
+ });
+ } else {
+ name.changeAddrs(addresses);
+ }
+ routingService.startService();
+
+ HashSet<SocketAddress> mapping = new HashSet<SocketAddress>();
+
+ for (int i = 0; i < 1000; i++) {
+ for (int j = 0; j < 5; j++) {
+ String stream = "TestStream-" + i + "-" + j;
+ mapping.add(routingService.getHost(stream,
+ RoutingService.RoutingContext.of(new DefaultRegionResolver())));
+ }
+ }
+
+ assertEquals(mapping.size(), addresses.size());
+
+ if (null != executorService) {
+ executorService.shutdown();
+ }
+
+ }
+
+ @Test(timeout = 5000)
+ public void testRoutingService() throws Exception {
+ testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..ab0cb58
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import com.twitter.util.CountDownLatch;
+import com.twitter.util.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}.
+ */
+public class TestDefaultSpeculativeRequestExecutionPolicy {
+
+ @Test(timeout = 20000, expected = IllegalArgumentException.class)
+ public void testInvalidBackoffMultiplier() throws Exception {
+ new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1);
+ }
+
+ @Test(timeout = 20000, expected = IllegalArgumentException.class)
+ public void testInvalidMaxSpeculativeTimeout() throws Exception {
+ new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 2);
+ }
+
+ @Test(timeout = 20000)
+ public void testSpeculativeRequests() throws Exception {
+ DefaultSpeculativeRequestExecutionPolicy policy =
+ new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2);
+ SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+ final AtomicInteger callCount = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ try {
+ return Future.value(callCount.incrementAndGet() < 3);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }).when(executor).issueSpeculativeRequest();
+
+ ScheduledExecutorService executorService =
+ Executors.newSingleThreadScheduledExecutor();
+ policy.initiateSpeculativeRequest(executorService, executor);
+
+ latch.await();
+
+ assertEquals(40, policy.getNextSpeculativeRequestTimeout());
+ }
+
+ @Test(timeout = 20000)
+ public void testSpeculativeRequestsWithMaxTimeout() throws Exception {
+ DefaultSpeculativeRequestExecutionPolicy policy =
+ new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2);
+ SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+ final AtomicInteger callCount = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ try {
+ return Future.value(callCount.incrementAndGet() < 3);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }).when(executor).issueSpeculativeRequest();
+
+ ScheduledExecutorService executorService =
+ Executors.newSingleThreadScheduledExecutor();
+ policy.initiateSpeculativeRequest(executorService, executor);
+
+ latch.await();
+
+ assertEquals(15, policy.getNextSpeculativeRequestTimeout());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java b/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
new file mode 100644
index 0000000..d2df9a5
--- /dev/null
+++ b/distributedlog-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertFalse;
+
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import org.junit.Test;
+
+/**
+ * Test Case of {@link org.apache.distributedlog.service.DistributedLogClientBuilder}.
+ */
+public class TestDistributedLogClientBuilder {
+
+ @Test(timeout = 60000)
+ public void testBuildClientsFromSameBuilder() throws Exception {
+ DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
+ .name("build-clients-from-same-builder")
+ .clientId(ClientId$.MODULE$.apply("test-builder"))
+ .finagleNameStr("inet!127.0.0.1:7001")
+ .streamNameRegex(".*")
+ .handshakeWithClientInfo(true)
+ .clientBuilder(ClientBuilder.get()
+ .hostConnectionLimit(1)
+ .connectTimeout(Duration.fromSeconds(1))
+ .tcpConnectTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(10)));
+ DistributedLogClient client1 = builder.build();
+ DistributedLogClient client2 = builder.build();
+ assertFalse(client1 == client2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-core/conf/log4j.properties b/distributedlog-core/conf/log4j.properties
index cafc888..38ab34d 100644
--- a/distributedlog-core/conf/log4j.properties
+++ b/distributedlog-core/conf/log4j.properties
@@ -32,11 +32,11 @@ log4j.logger.org.apache.zookeeper=INFO
log4j.logger.org.apache.bookkeeper=INFO
# redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.com.twitter.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
+log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
+log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.com.twitter.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
+log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
+log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
log4j.appender.Executors=org.apache.log4j.RollingFileAppender
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/conf/zookeeper.conf.dynamic.template
----------------------------------------------------------------------
diff --git a/distributedlog-core/conf/zookeeper.conf.dynamic.template b/distributedlog-core/conf/zookeeper.conf.dynamic.template
index 4bda9f1..f4e35f5 100644
--- a/distributedlog-core/conf/zookeeper.conf.dynamic.template
+++ b/distributedlog-core/conf/zookeeper.conf.dynamic.template
@@ -1 +1 @@
-#/**# * Copyright 2007 The Apache Software Foundation# *# * Licensed to the Apache Software Foundation (ASF) under one# * or more contributor license agreements. See the NOTICE file# * distributed with this work for additional information# * regarding copyright ownership. The ASF licenses this file# * to you under the Apache License, Version 2.0 (the# * "License"); you may not use this file except in compliance# * with the License. You may obtain a copy of the License at# *# * http://www.apache.org/licenses/LICENSE-2.0# *# * Unless required by applicable law or agreed to in writing, software# * distributed under the License is distributed on an "AS IS" BASIS,# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# * See the License for the specific language governing permissions and# * limitations under the License.# */server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181
\ No newline at end of file
+#/**# * Copyright 2007 The Apache Software Foundation# *# * Licensed to the Apache Software Foundation (ASF) under one# * or more contributor license agreements. See the NOTICE file# * distributed with this work for additional information# * regarding copyright ownership. The ASF licenses this file# * to you under the Apache License, Version 2.0 (the# * "License"); you may not use this file except in compliance# * with the License. You may obtain a copy of the License at# *# * http://www.apache.org/licenses/LICENSE-2.0# *# * Unless required by applicable law or agreed to in writing, software# * distributed under the License is distributed on an "AS IS" BASIS,# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# * See the License for the specific language governing permissions and# * limitations under the License.# */server.1=127.0.0.1:2710:3710:participant;0.0.0.0:2181
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/pom.xml b/distributedlog-core/pom.xml
index c5329aa..c4bfa8f 100644
--- a/distributedlog-core/pom.xml
+++ b/distributedlog-core/pom.xml
@@ -206,7 +206,7 @@
<properties>
<property>
<name>listener</name>
- <value>com.twitter.distributedlog.TimedOutTestsListener</value>
+ <value>org.apache.distributedlog.TimedOutTestsListener</value>
</property>
</properties>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java
deleted file mode 100644
index 0f93bfe..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamReader.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AppendOnlyStreamReader extends InputStream {
- static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamReader.class);
-
- private LogRecordWithInputStream currentLogRecord = null;
- private final DistributedLogManager dlm;
- private LogReader reader;
- private long currentPosition;
- private static final int SKIP_BUFFER_SIZE = 512;
-
- // Cache the input stream for a log record.
- private static class LogRecordWithInputStream {
- private final InputStream payloadStream;
- private final LogRecordWithDLSN logRecord;
-
- LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
- Preconditions.checkNotNull(logRecord);
-
- LOG.debug("Got record dlsn = {}, txid = {}, len = {}",
- new Object[] {logRecord.getDlsn(), logRecord.getTransactionId(), logRecord.getPayload().length});
-
- this.logRecord = logRecord;
- this.payloadStream = logRecord.getPayLoadInputStream();
- }
-
- InputStream getPayLoadInputStream() {
- return payloadStream;
- }
-
- LogRecordWithDLSN getLogRecord() {
- return logRecord;
- }
-
- // The last txid of the log record is the position of the next byte in the stream.
- // Subtract length to get starting offset.
- long getOffset() {
- return logRecord.getTransactionId() - logRecord.getPayload().length;
- }
- }
-
- /**
- * Construct ledger input stream
- *
- * @param dlm the Distributed Log Manager to access the stream
- */
- AppendOnlyStreamReader(DistributedLogManager dlm)
- throws IOException {
- this.dlm = dlm;
- reader = dlm.getInputStream(0);
- currentPosition = 0;
- }
-
- /**
- * Get input stream representing next entry in the
- * ledger.
- *
- * @return input stream, or null if no more entries
- */
- private LogRecordWithInputStream nextLogRecord() throws IOException {
- return nextLogRecord(reader);
- }
-
- private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException {
- LogRecordWithDLSN record = reader.readNext(false);
-
- if (null != record) {
- return new LogRecordWithInputStream(record);
- } else {
- record = reader.readNext(false);
- if (null != record) {
- return new LogRecordWithInputStream(record);
- } else {
- LOG.debug("No record");
- return null;
- }
- }
- }
-
- @Override
- public int read() throws IOException {
- byte[] b = new byte[1];
- if (read(b, 0, 1) != 1) {
- return -1;
- } else {
- return b[0];
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- int read = 0;
- if (currentLogRecord == null) {
- currentLogRecord = nextLogRecord();
- if (currentLogRecord == null) {
- return read;
- }
- }
-
- while (read < len) {
- int thisread = currentLogRecord.getPayLoadInputStream().read(b, off + read, (len - read));
- if (thisread == -1) {
- currentLogRecord = nextLogRecord();
- if (currentLogRecord == null) {
- return read;
- }
- } else {
- LOG.debug("Offset saved = {}, persisted = {}",
- currentPosition, currentLogRecord.getLogRecord().getTransactionId());
- currentPosition += thisread;
- read += thisread;
- }
- }
- return read;
- }
-
- /**
- * Position the reader at the given offset. If we fail to skip to the desired position
- * and don't hit end of stream, return false.
- *
- * @throws com.twitter.distributedlog.exceptions.EndOfStreamException if we attempt to
- * skip past the end of the stream.
- */
- public boolean skipTo(long position) throws IOException {
-
- // No need to skip anywhere.
- if (position == position()) {
- return true;
- }
-
- LogReader skipReader = dlm.getInputStream(position);
- LogRecordWithInputStream logRecord = null;
- try {
- logRecord = nextLogRecord(skipReader);
- } catch (IOException ex) {
- skipReader.close();
- throw ex;
- }
-
- if (null == logRecord) {
- return false;
- }
-
- // We may end up with a reader positioned *before* the requested position if
- // we're near the tail and the writer is still active, or if the desired position
- // is not at a log record payload boundary.
- // Transaction ID gives us the starting position of the log record. Read ahead
- // if necessary.
- currentPosition = logRecord.getOffset();
- currentLogRecord = logRecord;
- LogReader oldReader = reader;
- reader = skipReader;
-
- // Close the oldreader after swapping AppendOnlyStreamReader state. Close may fail
- // and we need to make sure it leaves AppendOnlyStreamReader in a consistent state.
- oldReader.close();
-
- byte[] skipBuffer = new byte[SKIP_BUFFER_SIZE];
- while (currentPosition < position) {
- long bytesToRead = Math.min(position - currentPosition, SKIP_BUFFER_SIZE);
- long bytesRead = read(skipBuffer, 0, (int)bytesToRead);
- if (bytesRead < bytesToRead) {
- return false;
- }
- }
-
- return true;
- }
-
- public long position() {
- return currentPosition;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java
deleted file mode 100644
index aa0aef9..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AppendOnlyStreamWriter.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AppendOnlyStreamWriter implements Closeable {
- static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamWriter.class);
-
- // Use a 1-length array to satisfy Java's inner class reference rules. Use primitive
- // type because synchronized block is needed anyway.
- final long[] syncPos = new long[1];
- BKAsyncLogWriter logWriter;
- long requestPos = 0;
-
- public AppendOnlyStreamWriter(BKAsyncLogWriter logWriter, long pos) {
- LOG.debug("initialize at position {}", pos);
- this.logWriter = logWriter;
- this.syncPos[0] = pos;
- this.requestPos = pos;
- }
-
- public Future<DLSN> write(byte[] data) {
- requestPos += data.length;
- Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
- return writeResult.addEventListener(new WriteCompleteListener(requestPos));
- }
-
- public void force(boolean metadata) throws IOException {
- long pos = 0;
- try {
- pos = Await.result(logWriter.flushAndCommit());
- } catch (IOException ioe) {
- throw ioe;
- } catch (Exception ex) {
- LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", ex);
- throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", ex);
- }
- synchronized (syncPos) {
- syncPos[0] = pos;
- }
- }
-
- public long position() {
- synchronized (syncPos) {
- return syncPos[0];
- }
- }
-
- @Override
- public void close() throws IOException {
- logWriter.closeAndComplete();
- }
-
- public void markEndOfStream() throws IOException {
- try {
- Await.result(logWriter.markEndOfStream());
- } catch (IOException ioe) {
- throw ioe;
- } catch (Exception ex) {
- throw new UnexpectedException("Mark end of stream hit unexpected exception", ex);
- }
- }
-
- class WriteCompleteListener implements FutureEventListener<DLSN> {
- private final long position;
- public WriteCompleteListener(long position) {
- this.position = position;
- }
- @Override
- public void onSuccess(DLSN response) {
- synchronized (syncPos) {
- if (position > syncPos[0]) {
- syncPos[0] = position;
- }
- }
- }
- @Override
- public void onFailure(Throwable cause) {
- // Handled at the layer above
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java
deleted file mode 100644
index 8e07797..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogReader.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public interface AsyncLogReader extends AsyncCloseable {
-
- /**
- * Get stream name that the reader reads from.
- *
- * @return stream name.
- */
- public String getStreamName();
-
- /**
- * Read the next record from the log stream
- *
- * @return A promise that when satisfied will contain the Log Record with its DLSN.
- */
- public Future<LogRecordWithDLSN> readNext();
-
- /**
- * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
- * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
- * call.
- *
- * @param numEntries
- * num entries
- * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
- */
- public Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
-
- /**
- * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
- * <p>
- * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
- * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
- * wait until new entries are available.
- *
- * @param numEntries
- * max entries to return
- * @param waitTime
- * maximum wait time if there are entries already for read
- * @param timeUnit
- * wait time unit
- * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
- */
- public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java
deleted file mode 100644
index e83e343..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncLogWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.io.AsyncAbortable;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.util.List;
-
-public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable {
-
- /**
- * Get the last committed transaction id.
- *
- * @return last committed transaction id.
- */
- public long getLastTxId();
-
- /**
- * Write a log record to the stream.
- *
- * @param record single log record
- * @return A Future which contains a DLSN if the record was successfully written
- * or an exception if the write fails
- */
- public Future<DLSN> write(LogRecord record);
-
- /**
- * Write log records to the stream in bulk. Each future in the list represents the result of
- * one write operation. The size of the result list is equal to the size of the input list.
- * Buffers are written in order, and the list of result futures has the same order.
- *
- * @param record set of log records
- * @return A Future which contains a list of Future DLSNs if the record was successfully written
- * or an exception if the operation fails.
- */
- public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
-
- /**
- * Truncate the log until <i>dlsn</i>.
- *
- * @param dlsn
- * dlsn to truncate until.
- * @return A Future indicates whether the operation succeeds or not, or an exception
- * if the truncation fails.
- */
- public Future<Boolean> truncate(DLSN dlsn);
-
- /**
- * Get the name of the stream this writer writes data to
- */
- public String getStreamName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
deleted file mode 100644
index bd71147..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/AsyncNotification.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-public interface AsyncNotification {
- /**
- * Triggered when the background activity encounters an exception
- *
- * @param reason the exception that encountered.
- */
- void notifyOnError(Throwable reason);
-
- /**
- * Triggered when the background activity completes an operation
- */
- void notifyOnOperationComplete();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
deleted file mode 100644
index d1c28d7..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAbstractLogWriter.java
+++ /dev/null
@@ -1,555 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.io.Abortable;
-import com.twitter.distributedlog.io.Abortables;
-import com.twitter.distributedlog.io.AsyncAbortable;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.PermitManager;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable {
- static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class);
-
- protected final DistributedLogConfiguration conf;
- private final DynamicDistributedLogConfiguration dynConf;
- protected final BKDistributedLogManager bkDistributedLogManager;
-
- // States
- private Promise<Void> closePromise = null;
- private volatile boolean forceRolling = false;
- private boolean forceRecovery = false;
-
- // Truncation Related
- private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null;
- @VisibleForTesting
- private Long minTimestampToKeepOverride = null;
-
- // Log Segment Writers
- protected BKLogSegmentWriter segmentWriter = null;
- protected Future<BKLogSegmentWriter> segmentWriterFuture = null;
- protected BKLogSegmentWriter allocatedSegmentWriter = null;
- protected BKLogWriteHandler writeHandler = null;
-
- BKAbstractLogWriter(DistributedLogConfiguration conf,
- DynamicDistributedLogConfiguration dynConf,
- BKDistributedLogManager bkdlm) {
- this.conf = conf;
- this.dynConf = dynConf;
- this.bkDistributedLogManager = bkdlm;
- LOG.debug("Initial retention period for {} : {}", bkdlm.getStreamName(),
- TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS));
- }
-
- // manage write handler
-
- synchronized protected BKLogWriteHandler getCachedWriteHandler() {
- return writeHandler;
- }
-
- protected BKLogWriteHandler getWriteHandler() throws IOException {
- BKLogWriteHandler writeHandler = createAndCacheWriteHandler();
- writeHandler.checkMetadataException();
- return writeHandler;
- }
-
- protected BKLogWriteHandler createAndCacheWriteHandler()
- throws IOException {
- synchronized (this) {
- if (writeHandler != null) {
- return writeHandler;
- }
- }
- // This code path will be executed when the handler is not set or has been closed
- // due to forceRecovery during testing
- BKLogWriteHandler newHandler =
- FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false));
- boolean success = false;
- try {
- synchronized (this) {
- if (writeHandler == null) {
- writeHandler = newHandler;
- success = true;
- }
- return writeHandler;
- }
- } finally {
- if (!success) {
- newHandler.asyncAbort();
- }
- }
- }
-
- // manage log segment writers
-
- protected synchronized BKLogSegmentWriter getCachedLogWriter() {
- return segmentWriter;
- }
-
- protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() {
- return segmentWriterFuture;
- }
-
- protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) {
- this.segmentWriter = logWriter;
- this.segmentWriterFuture = Future.value(logWriter);
- }
-
- protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
- try {
- return segmentWriter;
- } finally {
- segmentWriter = null;
- segmentWriterFuture = null;
- }
- }
-
- protected synchronized BKLogSegmentWriter getAllocatedLogWriter() {
- return allocatedSegmentWriter;
- }
-
- protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter logWriter) {
- this.allocatedSegmentWriter = logWriter;
- }
-
- protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() {
- try {
- return allocatedSegmentWriter;
- } finally {
- allocatedSegmentWriter = null;
- }
- }
-
- private Future<Void> asyncCloseAndComplete(boolean shouldThrow) {
- BKLogSegmentWriter segmentWriter = getCachedLogWriter();
- BKLogWriteHandler writeHandler = getCachedWriteHandler();
- if (null != segmentWriter && null != writeHandler) {
- cancelTruncation();
- Promise<Void> completePromise = new Promise<Void>();
- asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow);
- return completePromise;
- } else {
- return closeNoThrow();
- }
- }
-
- private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter,
- final BKLogWriteHandler writeHandler,
- final Promise<Void> completePromise,
- final boolean shouldThrow) {
- writeHandler.completeAndCloseLogSegment(segmentWriter)
- .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
- @Override
- public void onSuccess(LogSegmentMetadata segment) {
- removeCachedLogWriter();
- complete(null);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- LOG.error("Completing Log segments encountered exception", cause);
- complete(cause);
- }
-
- private void complete(final Throwable cause) {
- closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- if (null != cause && shouldThrow) {
- FutureUtils.setException(completePromise, cause);
- } else {
- FutureUtils.setValue(completePromise, null);
- }
- return BoxedUnit.UNIT;
- }
- });
- }
- });
- }
-
- @VisibleForTesting
- void closeAndComplete() throws IOException {
- FutureUtils.result(asyncCloseAndComplete(true));
- }
-
- protected Future<Void> asyncCloseAndComplete() {
- return asyncCloseAndComplete(true);
- }
-
- @Override
- public void close() throws IOException {
- FutureUtils.result(asyncClose());
- }
-
- @Override
- public Future<Void> asyncClose() {
- return asyncCloseAndComplete(false);
- }
-
- /**
- * Close the writer and release all the underlying resources
- */
- protected Future<Void> closeNoThrow() {
- Promise<Void> closeFuture;
- synchronized (this) {
- if (null != closePromise) {
- return closePromise;
- }
- closeFuture = closePromise = new Promise<Void>();
- }
- cancelTruncation();
- Utils.closeSequence(bkDistributedLogManager.getScheduler(),
- true, /** ignore close errors **/
- getCachedLogWriter(),
- getAllocatedLogWriter(),
- getCachedWriteHandler()
- ).proxyTo(closeFuture);
- return closeFuture;
- }
-
- @Override
- public void abort() throws IOException {
- FutureUtils.result(asyncAbort());
- }
-
- @Override
- public Future<Void> asyncAbort() {
- Promise<Void> closeFuture;
- synchronized (this) {
- if (null != closePromise) {
- return closePromise;
- }
- closeFuture = closePromise = new Promise<Void>();
- }
- cancelTruncation();
- Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
- getCachedLogWriter(),
- getAllocatedLogWriter(),
- getCachedWriteHandler()).proxyTo(closeFuture);
- return closeFuture;
- }
-
- // used by sync writer
- protected BKLogSegmentWriter getLedgerWriter(final long startTxId,
- final boolean allowMaxTxID)
- throws IOException {
- Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
- BKLogSegmentWriter logSegmentWriter = null;
- if (null != logSegmentWriterFuture) {
- logSegmentWriter = FutureUtils.result(logSegmentWriterFuture);
- }
- if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) {
- logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary(
- logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID));
- }
- return logSegmentWriter;
- }
-
- // used by async writer
- synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
- final BKLogSegmentWriter ledgerWriter = getCachedLogWriter();
- Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
- if (null == ledgerWriterFuture || null == ledgerWriter) {
- return null;
- }
-
- // Handle the case where the last call to write actually caused an error in the log
- if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) {
- // Close the ledger writer so that we will recover and start a new log segment
- Future<Void> closeFuture;
- if (ledgerWriter.isLogSegmentInError()) {
- closeFuture = ledgerWriter.asyncAbort();
- } else {
- closeFuture = ledgerWriter.asyncClose();
- }
- return closeFuture.flatMap(
- new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() {
- @Override
- public Future<BKLogSegmentWriter> apply(Void result) {
- removeCachedLogWriter();
-
- if (ledgerWriter.isLogSegmentInError()) {
- return Future.value(null);
- }
-
- BKLogWriteHandler writeHandler;
- try {
- writeHandler = getWriteHandler();
- } catch (IOException e) {
- return Future.exception(e);
- }
- if (null != writeHandler && forceRecovery) {
- return writeHandler.completeAndCloseLogSegment(ledgerWriter)
- .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() {
- @Override
- public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) {
- return null;
- }
- });
- } else {
- return Future.value(null);
- }
- }
- });
- } else {
- return ledgerWriterFuture;
- }
- }
-
- boolean shouldStartNewSegment(BKLogSegmentWriter ledgerWriter) throws IOException {
- BKLogWriteHandler writeHandler = getWriteHandler();
- return null == ledgerWriter || writeHandler.shouldStartNewSegment(ledgerWriter) || forceRolling;
- }
-
- private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) {
- boolean truncationEnabled = false;
-
- long minTimestampToKeep = 0;
-
- long retentionPeriodInMillis = TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS);
- if (retentionPeriodInMillis > 0) {
- minTimestampToKeep = Utils.nowInMillis() - retentionPeriodInMillis;
- truncationEnabled = true;
- }
-
- if (null != minTimestampToKeepOverride) {
- minTimestampToKeep = minTimestampToKeepOverride;
- truncationEnabled = true;
- }
-
- // skip scheduling if there is task that's already running
- //
- synchronized (this) {
- if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
- lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
- }
- }
- }
-
- private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
- final long startTxId,
- final boolean allowMaxTxID) {
- return writeHandler.recoverIncompleteLogSegments()
- .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() {
- @Override
- public Future<BKLogSegmentWriter> apply(Long lastTxId) {
- return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
- .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() {
- @Override
- public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) {
- cacheLogWriter(newSegmentWriter);
- return BoxedUnit.UNIT;
- }
- });
- }
- });
- }
-
- private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
- final BKLogSegmentWriter oldSegmentWriter,
- final BKLogWriteHandler writeHandler,
- final long startTxId,
- final boolean bestEffort,
- final boolean allowMaxTxID) {
- final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
- if (switchPermit.isAllowed()) {
- return closeOldLogSegmentAndStartNewOne(
- oldSegmentWriter,
- writeHandler,
- startTxId,
- bestEffort,
- allowMaxTxID
- ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() {
- @Override
- public Future<BKLogSegmentWriter> apply(Throwable cause) {
- if (cause instanceof LockingException) {
- LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
- writeHandler.getFullyQualifiedName(), cause);
- bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
- return Future.value(oldSegmentWriter);
- } else if (cause instanceof ZKException) {
- ZKException zke = (ZKException) cause;
- if (ZKException.isRetryableZKException(zke)) {
- LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
- " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
- zke.getKeeperExceptionCode());
- bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
- return Future.value(oldSegmentWriter);
- }
- }
- return Future.exception(cause);
- }
- }).ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- bkDistributedLogManager.getLogSegmentRollingPermitManager()
- .releasePermit(switchPermit);
- return BoxedUnit.UNIT;
- }
- });
- } else {
- bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit);
- return Future.value(oldSegmentWriter);
- }
- }
-
- private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
- final BKLogSegmentWriter oldSegmentWriter,
- final BKLogWriteHandler writeHandler,
- final long startTxId,
- final boolean bestEffort,
- final boolean allowMaxTxID) {
- // we switch only when we could allocate a new log segment.
- BKLogSegmentWriter newSegmentWriter = getAllocatedLogWriter();
- if (null == newSegmentWriter) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocating a new log segment from {} for {}.", startTxId,
- writeHandler.getFullyQualifiedName());
- }
- return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID)
- .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
- @Override
- public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
- if (null == newSegmentWriter) {
- if (bestEffort) {
- return Future.value(oldSegmentWriter);
- } else {
- return Future.exception(
- new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
- }
- }
- cacheAllocatedLogWriter(newSegmentWriter);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocated a new log segment from {} for {}.", startTxId,
- writeHandler.getFullyQualifiedName());
- }
- return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
- }
- });
- } else {
- return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
- }
- }
-
- private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
- BKLogSegmentWriter oldSegmentWriter,
- final BKLogSegmentWriter newSegmentWriter) {
- final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>();
- // complete the old log segment
- writeHandler.completeAndCloseLogSegment(oldSegmentWriter)
- .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
-
- @Override
- public void onSuccess(LogSegmentMetadata value) {
- cacheLogWriter(newSegmentWriter);
- removeAllocatedLogWriter();
- FutureUtils.setValue(completePromise, newSegmentWriter);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- FutureUtils.setException(completePromise, cause);
- }
- });
- return completePromise;
- }
-
- synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary(
- final BKLogSegmentWriter segmentWriter,
- long startTxId,
- boolean bestEffort,
- boolean allowMaxTxID) {
- final BKLogWriteHandler writeHandler;
- try {
- writeHandler = getWriteHandler();
- } catch (IOException e) {
- return Future.exception(e);
- }
- Future<BKLogSegmentWriter> rollPromise;
- if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) {
- rollPromise = closeOldLogSegmentAndStartNewOneWithPermit(
- segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID);
- } else if (null == segmentWriter) {
- rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID);
- } else {
- rollPromise = Future.value(segmentWriter);
- }
- return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() {
- @Override
- public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) {
- if (segmentWriter == newSegmentWriter) {
- return newSegmentWriter;
- }
- truncateLogSegmentsIfNecessary(writeHandler);
- return newSegmentWriter;
- }
- });
- }
-
- protected synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
- if (null != closePromise) {
- LOG.error("Executing " + operation + " on already closed Log Writer");
- throw new AlreadyClosedException("Executing " + operation + " on already closed Log Writer");
- }
- }
-
- @VisibleForTesting
- public void setForceRolling(boolean forceRolling) {
- this.forceRolling = forceRolling;
- }
-
- @VisibleForTesting
- public synchronized void overRideMinTimeStampToKeep(Long minTimestampToKeepOverride) {
- this.minTimestampToKeepOverride = minTimestampToKeepOverride;
- }
-
- protected synchronized void cancelTruncation() {
- if (null != lastTruncationAttempt) {
- FutureUtils.cancel(lastTruncationAttempt);
- lastTruncationAttempt = null;
- }
- }
-
- @VisibleForTesting
- public synchronized void setForceRecovery(boolean forceRecovery) {
- this.forceRecovery = forceRecovery;
- }
-
-}