You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:18 UTC
[13/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
deleted file mode 100644
index 5b788e2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKVersionedSetOp.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-
-import javax.annotation.Nullable;
-
-/**
- * ZooKeeper Operation that plays with {@link org.apache.bookkeeper.versioning.Version}
- */
-public class ZKVersionedSetOp extends ZKOp {
-
- private final OpListener<Version> listener;
-
- public ZKVersionedSetOp(Op op,
- @Nullable OpListener<Version> opListener) {
- super(op);
- this.listener = opListener;
- }
-
- @Override
- protected void commitOpResult(OpResult opResult) {
- assert(opResult instanceof OpResult.SetDataResult);
- OpResult.SetDataResult setDataResult = (OpResult.SetDataResult) opResult;
- if (null != listener) {
- listener.onCommit(new ZkVersion(setDataResult.getStat().getVersion()));
- }
- }
-
- @Override
- protected void abortOpResult(Throwable t,
- @Nullable OpResult opResult) {
- Throwable cause;
- if (null == opResult) {
- cause = t;
- } else {
- assert (opResult instanceof OpResult.ErrorResult);
- OpResult.ErrorResult errorResult = (OpResult.ErrorResult) opResult;
- if (KeeperException.Code.OK.intValue() == errorResult.getErr()) {
- cause = t;
- } else {
- cause = KeeperException.create(KeeperException.Code.get(errorResult.getErr()));
- }
- }
- if (null != listener) {
- listener.onAbort(cause);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
deleted file mode 100644
index 8ef33ea..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.zk;
-
-import com.twitter.distributedlog.ZooKeeperClient;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Watcher Manager to manage watchers.
- * <h3>Metrics</h3>
- * <ul>
- * <li> `total_watches`: total number of watches that managed by this watcher manager.
- * <li> `num_child_watches`: number of paths that are watched for children changes by this watcher manager.
- * </ul>
- */
-public class ZKWatcherManager implements Watcher {
-
- static final Logger logger = LoggerFactory.getLogger(ZKWatcherManager.class);
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- public static class Builder {
-
- private String _name;
- private StatsLogger _statsLogger;
- private ZooKeeperClient _zkc;
-
- public Builder name(String name) {
- this._name = name;
- return this;
- }
-
- public Builder zkc(ZooKeeperClient zkc) {
- this._zkc = zkc;
- return this;
- }
-
- public Builder statsLogger(StatsLogger statsLogger) {
- this._statsLogger = statsLogger;
- return this;
- }
-
- public ZKWatcherManager build() {
- return new ZKWatcherManager(_name, _zkc, _statsLogger);
- }
- }
-
- private final String name;
- private final ZooKeeperClient zkc;
- private final StatsLogger statsLogger;
- // Gauges and their labels
- private final Gauge<Number> totalWatchesGauge;
- private static final String totalWatchesGauageLabel = "total_watches";
- private final Gauge<Number> numChildWatchesGauge;
- private static final String numChildWatchesGauageLabel = "num_child_watches";
-
- protected final ConcurrentMap<String, Set<Watcher>> childWatches;
- protected final AtomicInteger allWatchesGauge;
-
- private ZKWatcherManager(String name,
- ZooKeeperClient zkc,
- StatsLogger statsLogger) {
- this.name = name;
- this.zkc = zkc;
- this.statsLogger = statsLogger;
-
- // watches
- this.childWatches = new ConcurrentHashMap<String, Set<Watcher>>();
- this.allWatchesGauge = new AtomicInteger(0);
-
- // stats
- totalWatchesGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return allWatchesGauge.get();
- }
- };
- this.statsLogger.registerGauge(totalWatchesGauageLabel, totalWatchesGauge);
-
- numChildWatchesGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return childWatches.size();
- }
- };
-
- this.statsLogger.registerGauge(numChildWatchesGauageLabel, numChildWatchesGauge);
- }
-
- public Watcher registerChildWatcher(String path, Watcher watcher) {
- Set<Watcher> watchers = childWatches.get(path);
- if (null == watchers) {
- Set<Watcher> newWatchers = new HashSet<Watcher>();
- Set<Watcher> oldWatchers = childWatches.putIfAbsent(path, newWatchers);
- watchers = (null == oldWatchers) ? newWatchers : oldWatchers;
- }
- synchronized (watchers) {
- if (childWatches.get(path) == watchers) {
- if (watchers.add(watcher)) {
- allWatchesGauge.incrementAndGet();
- }
- } else {
- logger.warn("Watcher set for path {} has been changed while registering child watcher {}.",
- path, watcher);
- }
- }
- return this;
- }
-
- public void unregisterChildWatcher(String path, Watcher watcher, boolean removeFromServer) {
- Set<Watcher> watchers = childWatches.get(path);
- if (null == watchers) {
- logger.warn("No watchers found on path {} while unregistering child watcher {}.",
- path, watcher);
- return;
- }
- synchronized (watchers) {
- if (watchers.remove(watcher)) {
- allWatchesGauge.decrementAndGet();
- } else {
- logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path);
- }
- if (watchers.isEmpty()) {
- // best-efforts to remove watches
- try {
- if (null != zkc && removeFromServer) {
- zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (KeeperException.Code.OK.intValue() == rc) {
- logger.debug("Successfully removed children watches from {}", path);
- } else {
- logger.debug("Encountered exception on removing children watches from {}",
- path, KeeperException.create(KeeperException.Code.get(rc)));
- }
- }
- }, null);
- }
- } catch (InterruptedException e) {
- logger.debug("Encountered exception on removing watches from {}", path, e);
- } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- logger.debug("Encountered exception on removing watches from {}", path, e);
- }
- childWatches.remove(path, watchers);
- }
- }
- }
-
- public void unregisterGauges() {
- this.statsLogger.unregisterGauge(totalWatchesGauageLabel, totalWatchesGauge);
- this.statsLogger.unregisterGauge(numChildWatchesGauageLabel, numChildWatchesGauge);
- }
-
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case None:
- handleKeeperStateEvent(event);
- break;
- case NodeChildrenChanged:
- handleChildWatchEvent(event);
- break;
- default:
- break;
- }
- }
-
- private void handleKeeperStateEvent(WatchedEvent event) {
- Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get());
- for (Set<Watcher> watcherSet : childWatches.values()) {
- synchronized (watcherSet) {
- savedAllWatches.addAll(watcherSet);
- }
- }
- for (Watcher watcher : savedAllWatches) {
- watcher.process(event);
- }
- }
-
- private void handleChildWatchEvent(WatchedEvent event) {
- String path = event.getPath();
- if (null == path) {
- logger.warn("Received zookeeper watch event with null path : {}", event);
- return;
- }
- Set<Watcher> watchers = childWatches.get(path);
- if (null == watchers) {
- return;
- }
- Set<Watcher> watchersToFire;
- synchronized (watchers) {
- watchersToFire = new HashSet<Watcher>(watchers.size());
- watchersToFire.addAll(watchers);
- }
- for (Watcher watcher : watchersToFire) {
- watcher.process(event);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
new file mode 100644
index 0000000..1d96f0e
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AppendOnlyStreamReader extends InputStream {
+ static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamReader.class);
+
+ private LogRecordWithInputStream currentLogRecord = null;
+ private final DistributedLogManager dlm;
+ private LogReader reader;
+ private long currentPosition;
+ private static final int SKIP_BUFFER_SIZE = 512;
+
+ // Cache the input stream for a log record.
+ private static class LogRecordWithInputStream {
+ private final InputStream payloadStream;
+ private final LogRecordWithDLSN logRecord;
+
+ LogRecordWithInputStream(LogRecordWithDLSN logRecord) {
+ Preconditions.checkNotNull(logRecord);
+
+ LOG.debug("Got record dlsn = {}, txid = {}, len = {}",
+ new Object[] {logRecord.getDlsn(), logRecord.getTransactionId(), logRecord.getPayload().length});
+
+ this.logRecord = logRecord;
+ this.payloadStream = logRecord.getPayLoadInputStream();
+ }
+
+ InputStream getPayLoadInputStream() {
+ return payloadStream;
+ }
+
+ LogRecordWithDLSN getLogRecord() {
+ return logRecord;
+ }
+
+ // The last txid of the log record is the position of the next byte in the stream.
+ // Subtract length to get starting offset.
+ long getOffset() {
+ return logRecord.getTransactionId() - logRecord.getPayload().length;
+ }
+ }
+
+ /**
+ * Construct ledger input stream
+ *
+ * @param dlm the Distributed Log Manager to access the stream
+ */
+ AppendOnlyStreamReader(DistributedLogManager dlm)
+ throws IOException {
+ this.dlm = dlm;
+ reader = dlm.getInputStream(0);
+ currentPosition = 0;
+ }
+
+ /**
+ * Get input stream representing next entry in the
+ * ledger.
+ *
+ * @return input stream, or null if no more entries
+ */
+ private LogRecordWithInputStream nextLogRecord() throws IOException {
+ return nextLogRecord(reader);
+ }
+
+ private static LogRecordWithInputStream nextLogRecord(LogReader reader) throws IOException {
+ LogRecordWithDLSN record = reader.readNext(false);
+
+ if (null != record) {
+ return new LogRecordWithInputStream(record);
+ } else {
+ record = reader.readNext(false);
+ if (null != record) {
+ return new LogRecordWithInputStream(record);
+ } else {
+ LOG.debug("No record");
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] b = new byte[1];
+ if (read(b, 0, 1) != 1) {
+ return -1;
+ } else {
+ return b[0];
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int read = 0;
+ if (currentLogRecord == null) {
+ currentLogRecord = nextLogRecord();
+ if (currentLogRecord == null) {
+ return read;
+ }
+ }
+
+ while (read < len) {
+ int thisread = currentLogRecord.getPayLoadInputStream().read(b, off + read, (len - read));
+ if (thisread == -1) {
+ currentLogRecord = nextLogRecord();
+ if (currentLogRecord == null) {
+ return read;
+ }
+ } else {
+ LOG.debug("Offset saved = {}, persisted = {}",
+ currentPosition, currentLogRecord.getLogRecord().getTransactionId());
+ currentPosition += thisread;
+ read += thisread;
+ }
+ }
+ return read;
+ }
+
+ /**
+ * Position the reader at the given offset. If we fail to skip to the desired position
+ * and don't hit end of stream, return false.
+ *
+ * @throws org.apache.distributedlog.exceptions.EndOfStreamException if we attempt to
+ * skip past the end of the stream.
+ */
+ public boolean skipTo(long position) throws IOException {
+
+ // No need to skip anywhere.
+ if (position == position()) {
+ return true;
+ }
+
+ LogReader skipReader = dlm.getInputStream(position);
+ LogRecordWithInputStream logRecord = null;
+ try {
+ logRecord = nextLogRecord(skipReader);
+ } catch (IOException ex) {
+ skipReader.close();
+ throw ex;
+ }
+
+ if (null == logRecord) {
+ return false;
+ }
+
+ // We may end up with a reader positioned *before* the requested position if
+ // we're near the tail and the writer is still active, or if the desired position
+ // is not at a log record payload boundary.
+ // Transaction ID gives us the starting position of the log record. Read ahead
+ // if necessary.
+ currentPosition = logRecord.getOffset();
+ currentLogRecord = logRecord;
+ LogReader oldReader = reader;
+ reader = skipReader;
+
+ // Close the oldreader after swapping AppendOnlyStreamReader state. Close may fail
+ // and we need to make sure it leaves AppendOnlyStreamReader in a consistent state.
+ oldReader.close();
+
+ byte[] skipBuffer = new byte[SKIP_BUFFER_SIZE];
+ while (currentPosition < position) {
+ long bytesToRead = Math.min(position - currentPosition, SKIP_BUFFER_SIZE);
+ long bytesRead = read(skipBuffer, 0, (int)bytesToRead);
+ if (bytesRead < bytesToRead) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public long position() {
+ return currentPosition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
new file mode 100644
index 0000000..8278c68
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.FutureUtils;
+import com.twitter.util.Await;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AppendOnlyStreamWriter implements Closeable {
+ static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamWriter.class);
+
+ // Use a 1-length array to satisfy Java's inner class reference rules. Use primitive
+ // type because synchronized block is needed anyway.
+ final long[] syncPos = new long[1];
+ BKAsyncLogWriter logWriter;
+ long requestPos = 0;
+
+ public AppendOnlyStreamWriter(BKAsyncLogWriter logWriter, long pos) {
+ LOG.debug("initialize at position {}", pos);
+ this.logWriter = logWriter;
+ this.syncPos[0] = pos;
+ this.requestPos = pos;
+ }
+
+ public Future<DLSN> write(byte[] data) {
+ requestPos += data.length;
+ Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
+ return writeResult.addEventListener(new WriteCompleteListener(requestPos));
+ }
+
+ public void force(boolean metadata) throws IOException {
+ long pos = 0;
+ try {
+ pos = Await.result(logWriter.flushAndCommit());
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception ex) {
+ LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", ex);
+ throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", ex);
+ }
+ synchronized (syncPos) {
+ syncPos[0] = pos;
+ }
+ }
+
+ public long position() {
+ synchronized (syncPos) {
+ return syncPos[0];
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ logWriter.closeAndComplete();
+ }
+
+ public void markEndOfStream() throws IOException {
+ try {
+ Await.result(logWriter.markEndOfStream());
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception ex) {
+ throw new UnexpectedException("Mark end of stream hit unexpected exception", ex);
+ }
+ }
+
+ class WriteCompleteListener implements FutureEventListener<DLSN> {
+ private final long position;
+ public WriteCompleteListener(long position) {
+ this.position = position;
+ }
+ @Override
+ public void onSuccess(DLSN response) {
+ synchronized (syncPos) {
+ if (position > syncPos[0]) {
+ syncPos[0] = position;
+ }
+ }
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ // Handled at the layer above
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
new file mode 100644
index 0000000..e3ace05
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.io.AsyncCloseable;
+import com.twitter.util.Future;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public interface AsyncLogReader extends AsyncCloseable {
+
+ /**
+ * Get stream name that the reader reads from.
+ *
+ * @return stream name.
+ */
+ public String getStreamName();
+
+ /**
+ * Read the next record from the log stream
+ *
+ * @return A promise that when satisfied will contain the Log Record with its DLSN.
+ */
+ public Future<LogRecordWithDLSN> readNext();
+
+ /**
+ * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
+ * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
+ * call.
+ *
+ * @param numEntries
+ * num entries
+ * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+ */
+ public Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
+
+ /**
+ * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
+ * <p>
+ * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
+ * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
+ * wait until new entries are available.
+ *
+ * @param numEntries
+ * max entries to return
+ * @param waitTime
+ * maximum wait time if there are entries already for read
+ * @param timeUnit
+ * wait time unit
+ * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+ */
+ public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
new file mode 100644
index 0000000..53b393b
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.io.AsyncAbortable;
+import org.apache.distributedlog.io.AsyncCloseable;
+import com.twitter.util.Future;
+
+import java.io.Closeable;
+import java.util.List;
+
+public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable {
+
+ /**
+ * Get the last committed transaction id.
+ *
+ * @return last committed transaction id.
+ */
+ public long getLastTxId();
+
+ /**
+ * Write a log record to the stream.
+ *
+ * @param record single log record
+ * @return A Future which contains a DLSN if the record was successfully written
+ * or an exception if the write fails
+ */
+ public Future<DLSN> write(LogRecord record);
+
+ /**
+ * Write log records to the stream in bulk. Each future in the list represents the result of
+ * one write operation. The size of the result list is equal to the size of the input list.
+ * Buffers are written in order, and the list of result futures has the same order.
+ *
+ * @param record set of log records
+ * @return A Future which contains a list of Future DLSNs if the record was successfully written
+ * or an exception if the operation fails.
+ */
+ public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
+
+ /**
+ * Truncate the log until <i>dlsn</i>.
+ *
+ * @param dlsn
+ * dlsn to truncate until.
+ * @return A Future indicates whether the operation succeeds or not, or an exception
+ * if the truncation fails.
+ */
+ public Future<Boolean> truncate(DLSN dlsn);
+
+ /**
+ * Get the name of the stream this writer writes data to
+ */
+ public String getStreamName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
new file mode 100644
index 0000000..c12bd10
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+public interface AsyncNotification {
+ /**
+ * Triggered when the background activity encounters an exception
+ *
+ * @param reason the exception that encountered.
+ */
+ void notifyOnError(Throwable reason);
+
+ /**
+ * Triggered when the background activity completes an operation
+ */
+ void notifyOnOperationComplete();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
new file mode 100644
index 0000000..4a2ef30
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.io.Abortable;
+import org.apache.distributedlog.io.Abortables;
+import org.apache.distributedlog.io.AsyncAbortable;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.util.Utils;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.AbstractFunction1;
+import scala.runtime.BoxedUnit;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable {
+ static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class);
+
+ protected final DistributedLogConfiguration conf;
+ private final DynamicDistributedLogConfiguration dynConf;
+ protected final BKDistributedLogManager bkDistributedLogManager;
+
+ // States
+ private Promise<Void> closePromise = null;
+ private volatile boolean forceRolling = false;
+ private boolean forceRecovery = false;
+
+ // Truncation Related
+ private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null;
+ @VisibleForTesting
+ private Long minTimestampToKeepOverride = null;
+
+ // Log Segment Writers
+ protected BKLogSegmentWriter segmentWriter = null;
+ protected Future<BKLogSegmentWriter> segmentWriterFuture = null;
+ protected BKLogSegmentWriter allocatedSegmentWriter = null;
+ protected BKLogWriteHandler writeHandler = null;
+
+ BKAbstractLogWriter(DistributedLogConfiguration conf,
+ DynamicDistributedLogConfiguration dynConf,
+ BKDistributedLogManager bkdlm) {
+ this.conf = conf;
+ this.dynConf = dynConf;
+ this.bkDistributedLogManager = bkdlm;
+ LOG.debug("Initial retention period for {} : {}", bkdlm.getStreamName(),
+ TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS));
+ }
+
+ // manage write handler
+
+ synchronized protected BKLogWriteHandler getCachedWriteHandler() {
+ return writeHandler;
+ }
+
+ protected BKLogWriteHandler getWriteHandler() throws IOException {
+ BKLogWriteHandler writeHandler = createAndCacheWriteHandler();
+ writeHandler.checkMetadataException();
+ return writeHandler;
+ }
+
+ protected BKLogWriteHandler createAndCacheWriteHandler()
+ throws IOException {
+ synchronized (this) {
+ if (writeHandler != null) {
+ return writeHandler;
+ }
+ }
+ // This code path will be executed when the handler is not set or has been closed
+ // due to forceRecovery during testing
+ BKLogWriteHandler newHandler =
+ FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false));
+ boolean success = false;
+ try {
+ synchronized (this) {
+ if (writeHandler == null) {
+ writeHandler = newHandler;
+ success = true;
+ }
+ return writeHandler;
+ }
+ } finally {
+ if (!success) {
+ newHandler.asyncAbort();
+ }
+ }
+ }
+
+ // manage log segment writers
+
+ protected synchronized BKLogSegmentWriter getCachedLogWriter() {
+ return segmentWriter;
+ }
+
+ protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() {
+ return segmentWriterFuture;
+ }
+
+ protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) {
+ this.segmentWriter = logWriter;
+ this.segmentWriterFuture = Future.value(logWriter);
+ }
+
+ protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
+ try {
+ return segmentWriter;
+ } finally {
+ segmentWriter = null;
+ segmentWriterFuture = null;
+ }
+ }
+
+ protected synchronized BKLogSegmentWriter getAllocatedLogWriter() {
+ return allocatedSegmentWriter;
+ }
+
+ protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter logWriter) {
+ this.allocatedSegmentWriter = logWriter;
+ }
+
+ protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() {
+ try {
+ return allocatedSegmentWriter;
+ } finally {
+ allocatedSegmentWriter = null;
+ }
+ }
+
+ private Future<Void> asyncCloseAndComplete(boolean shouldThrow) {
+ BKLogSegmentWriter segmentWriter = getCachedLogWriter();
+ BKLogWriteHandler writeHandler = getCachedWriteHandler();
+ if (null != segmentWriter && null != writeHandler) {
+ cancelTruncation();
+ Promise<Void> completePromise = new Promise<Void>();
+ asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow);
+ return completePromise;
+ } else {
+ return closeNoThrow();
+ }
+ }
+
+ private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter,
+ final BKLogWriteHandler writeHandler,
+ final Promise<Void> completePromise,
+ final boolean shouldThrow) {
+ writeHandler.completeAndCloseLogSegment(segmentWriter)
+ .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+ @Override
+ public void onSuccess(LogSegmentMetadata segment) {
+ removeCachedLogWriter();
+ complete(null);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ LOG.error("Completing Log segments encountered exception", cause);
+ complete(cause);
+ }
+
+ private void complete(final Throwable cause) {
+ closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ if (null != cause && shouldThrow) {
+ FutureUtils.setException(completePromise, cause);
+ } else {
+ FutureUtils.setValue(completePromise, null);
+ }
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+ });
+ }
+
+ @VisibleForTesting
+ void closeAndComplete() throws IOException {
+ FutureUtils.result(asyncCloseAndComplete(true));
+ }
+
+ protected Future<Void> asyncCloseAndComplete() {
+ return asyncCloseAndComplete(true);
+ }
+
+ @Override
+ public void close() throws IOException {
+ FutureUtils.result(asyncClose());
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ return asyncCloseAndComplete(false);
+ }
+
+ /**
+ * Close the writer and release all the underlying resources
+ */
+ protected Future<Void> closeNoThrow() {
+ Promise<Void> closeFuture;
+ synchronized (this) {
+ if (null != closePromise) {
+ return closePromise;
+ }
+ closeFuture = closePromise = new Promise<Void>();
+ }
+ cancelTruncation();
+ Utils.closeSequence(bkDistributedLogManager.getScheduler(),
+ true, /** ignore close errors **/
+ getCachedLogWriter(),
+ getAllocatedLogWriter(),
+ getCachedWriteHandler()
+ ).proxyTo(closeFuture);
+ return closeFuture;
+ }
+
+ @Override
+ public void abort() throws IOException {
+ FutureUtils.result(asyncAbort());
+ }
+
+ @Override
+ public Future<Void> asyncAbort() {
+ Promise<Void> closeFuture;
+ synchronized (this) {
+ if (null != closePromise) {
+ return closePromise;
+ }
+ closeFuture = closePromise = new Promise<Void>();
+ }
+ cancelTruncation();
+ Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
+ getCachedLogWriter(),
+ getAllocatedLogWriter(),
+ getCachedWriteHandler()).proxyTo(closeFuture);
+ return closeFuture;
+ }
+
+ // used by sync writer
+ protected BKLogSegmentWriter getLedgerWriter(final long startTxId,
+ final boolean allowMaxTxID)
+ throws IOException {
+ Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
+ BKLogSegmentWriter logSegmentWriter = null;
+ if (null != logSegmentWriterFuture) {
+ logSegmentWriter = FutureUtils.result(logSegmentWriterFuture);
+ }
+ if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) {
+ logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary(
+ logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID));
+ }
+ return logSegmentWriter;
+ }
+
+ // used by async writer
+ synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
+ final BKLogSegmentWriter ledgerWriter = getCachedLogWriter();
+ Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
+ if (null == ledgerWriterFuture || null == ledgerWriter) {
+ return null;
+ }
+
+ // Handle the case where the last call to write actually caused an error in the log
+ if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) {
+ // Close the ledger writer so that we will recover and start a new log segment
+ Future<Void> closeFuture;
+ if (ledgerWriter.isLogSegmentInError()) {
+ closeFuture = ledgerWriter.asyncAbort();
+ } else {
+ closeFuture = ledgerWriter.asyncClose();
+ }
+ return closeFuture.flatMap(
+ new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() {
+ @Override
+ public Future<BKLogSegmentWriter> apply(Void result) {
+ removeCachedLogWriter();
+
+ if (ledgerWriter.isLogSegmentInError()) {
+ return Future.value(null);
+ }
+
+ BKLogWriteHandler writeHandler;
+ try {
+ writeHandler = getWriteHandler();
+ } catch (IOException e) {
+ return Future.exception(e);
+ }
+ if (null != writeHandler && forceRecovery) {
+ return writeHandler.completeAndCloseLogSegment(ledgerWriter)
+ .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() {
+ @Override
+ public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) {
+ return null;
+ }
+ });
+ } else {
+ return Future.value(null);
+ }
+ }
+ });
+ } else {
+ return ledgerWriterFuture;
+ }
+ }
+
+ boolean shouldStartNewSegment(BKLogSegmentWriter ledgerWriter) throws IOException {
+ BKLogWriteHandler writeHandler = getWriteHandler();
+ return null == ledgerWriter || writeHandler.shouldStartNewSegment(ledgerWriter) || forceRolling;
+ }
+
+ private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) {
+ boolean truncationEnabled = false;
+
+ long minTimestampToKeep = 0;
+
+ long retentionPeriodInMillis = TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS);
+ if (retentionPeriodInMillis > 0) {
+ minTimestampToKeep = Utils.nowInMillis() - retentionPeriodInMillis;
+ truncationEnabled = true;
+ }
+
+ if (null != minTimestampToKeepOverride) {
+ minTimestampToKeep = minTimestampToKeepOverride;
+ truncationEnabled = true;
+ }
+
+ // skip scheduling if there is task that's already running
+ //
+ synchronized (this) {
+ if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
+ lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
+ }
+ }
+ }
+
+ private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
+ final long startTxId,
+ final boolean allowMaxTxID) {
+ return writeHandler.recoverIncompleteLogSegments()
+ .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() {
+ @Override
+ public Future<BKLogSegmentWriter> apply(Long lastTxId) {
+ return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
+ .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) {
+ cacheLogWriter(newSegmentWriter);
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+ });
+ }
+
+ private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
+ final BKLogSegmentWriter oldSegmentWriter,
+ final BKLogWriteHandler writeHandler,
+ final long startTxId,
+ final boolean bestEffort,
+ final boolean allowMaxTxID) {
+ final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
+ if (switchPermit.isAllowed()) {
+ return closeOldLogSegmentAndStartNewOne(
+ oldSegmentWriter,
+ writeHandler,
+ startTxId,
+ bestEffort,
+ allowMaxTxID
+ ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() {
+ @Override
+ public Future<BKLogSegmentWriter> apply(Throwable cause) {
+ if (cause instanceof LockingException) {
+ LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
+ writeHandler.getFullyQualifiedName(), cause);
+ bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
+ return Future.value(oldSegmentWriter);
+ } else if (cause instanceof ZKException) {
+ ZKException zke = (ZKException) cause;
+ if (ZKException.isRetryableZKException(zke)) {
+ LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
+ " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
+ zke.getKeeperExceptionCode());
+ bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
+ return Future.value(oldSegmentWriter);
+ }
+ }
+ return Future.exception(cause);
+ }
+ }).ensure(new AbstractFunction0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ bkDistributedLogManager.getLogSegmentRollingPermitManager()
+ .releasePermit(switchPermit);
+ return BoxedUnit.UNIT;
+ }
+ });
+ } else {
+ bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit);
+ return Future.value(oldSegmentWriter);
+ }
+ }
+
+ private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
+ final BKLogSegmentWriter oldSegmentWriter,
+ final BKLogWriteHandler writeHandler,
+ final long startTxId,
+ final boolean bestEffort,
+ final boolean allowMaxTxID) {
+ // we switch only when we could allocate a new log segment.
+ BKLogSegmentWriter newSegmentWriter = getAllocatedLogWriter();
+ if (null == newSegmentWriter) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocating a new log segment from {} for {}.", startTxId,
+ writeHandler.getFullyQualifiedName());
+ }
+ return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID)
+ .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
+ @Override
+ public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
+ if (null == newSegmentWriter) {
+ if (bestEffort) {
+ return Future.value(oldSegmentWriter);
+ } else {
+ return Future.exception(
+ new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
+ }
+ }
+ cacheAllocatedLogWriter(newSegmentWriter);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocated a new log segment from {} for {}.", startTxId,
+ writeHandler.getFullyQualifiedName());
+ }
+ return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
+ }
+ });
+ } else {
+ return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter);
+ }
+ }
+
+ private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
+ BKLogSegmentWriter oldSegmentWriter,
+ final BKLogSegmentWriter newSegmentWriter) {
+ final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>();
+ // complete the old log segment
+ writeHandler.completeAndCloseLogSegment(oldSegmentWriter)
+ .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+
+ @Override
+ public void onSuccess(LogSegmentMetadata value) {
+ cacheLogWriter(newSegmentWriter);
+ removeAllocatedLogWriter();
+ FutureUtils.setValue(completePromise, newSegmentWriter);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ FutureUtils.setException(completePromise, cause);
+ }
+ });
+ return completePromise;
+ }
+
+ synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary(
+ final BKLogSegmentWriter segmentWriter,
+ long startTxId,
+ boolean bestEffort,
+ boolean allowMaxTxID) {
+ final BKLogWriteHandler writeHandler;
+ try {
+ writeHandler = getWriteHandler();
+ } catch (IOException e) {
+ return Future.exception(e);
+ }
+ Future<BKLogSegmentWriter> rollPromise;
+ if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) {
+ rollPromise = closeOldLogSegmentAndStartNewOneWithPermit(
+ segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID);
+ } else if (null == segmentWriter) {
+ rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID);
+ } else {
+ rollPromise = Future.value(segmentWriter);
+ }
+ return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() {
+ @Override
+ public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) {
+ if (segmentWriter == newSegmentWriter) {
+ return newSegmentWriter;
+ }
+ truncateLogSegmentsIfNecessary(writeHandler);
+ return newSegmentWriter;
+ }
+ });
+ }
+
+ protected synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
+ if (null != closePromise) {
+ LOG.error("Executing " + operation + " on already closed Log Writer");
+ throw new AlreadyClosedException("Executing " + operation + " on already closed Log Writer");
+ }
+ }
+
+ @VisibleForTesting
+ public void setForceRolling(boolean forceRolling) {
+ this.forceRolling = forceRolling;
+ }
+
+ @VisibleForTesting
+ public synchronized void overRideMinTimeStampToKeep(Long minTimestampToKeepOverride) {
+ this.minTimestampToKeepOverride = minTimestampToKeepOverride;
+ }
+
+ protected synchronized void cancelTruncation() {
+ if (null != lastTruncationAttempt) {
+ FutureUtils.cancel(lastTruncationAttempt);
+ lastTruncationAttempt = null;
+ }
+ }
+
+ @VisibleForTesting
+ public synchronized void setForceRecovery(boolean forceRecovery) {
+ this.forceRecovery = forceRecovery;
+ }
+
+}