You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:35 UTC
[30/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
deleted file mode 100644
index e798a0f..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-public class DistributedLogConstants {
- public static final byte[] EMPTY_BYTES = new byte[0];
- public static final String SCHEME_PREFIX = "distributedlog";
- public static final String BACKEND_BK = "bk";
- public static final long INVALID_TXID = -999;
- public static final long EMPTY_LOGSEGMENT_TX_ID = -99;
- public static final long MAX_TXID = Long.MAX_VALUE;
- public static final long SMALL_LOGSEGMENT_THRESHOLD = 10;
- public static final int LOGSEGMENT_NAME_VERSION = 1;
- public static final int FUTURE_TIMEOUT_IMMEDIATE = 0;
- public static final int FUTURE_TIMEOUT_INFINITE = -1;
- public static final long LOCK_IMMEDIATE = FUTURE_TIMEOUT_IMMEDIATE;
- public static final long LOCK_TIMEOUT_INFINITE = FUTURE_TIMEOUT_INFINITE;
- public static final long LOCK_OP_TIMEOUT_DEFAULT = 120;
- public static final long LOCK_REACQUIRE_TIMEOUT_DEFAULT = 120;
- public static final String UNKNOWN_CLIENT_ID = "Unknown-ClientId";
- public static final int LOCAL_REGION_ID = 0;
- public static final long LOGSEGMENT_DEFAULT_STATUS = 0;
- public static final long UNASSIGNED_LOGSEGMENT_SEQNO = 0;
- public static final long UNASSIGNED_SEQUENCE_ID = -1L;
- public static final long FIRST_LOGSEGMENT_SEQNO = 1;
- public static final long UNRESOLVED_LEDGER_ID = -1;
- public static final long LATENCY_WARN_THRESHOLD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1);
- public static final int DL_INTERRUPTED_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 1;
- public static final int ZK_CONNECTION_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 2;
-
- public static final String ALLOCATION_POOL_NODE = ".allocation_pool";
- // log segment prefix
- public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress";
- public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
- public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
- static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
- static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
-
- // An ACL that gives all permissions to node creators and read permissions only to everyone else.
- public static final List<ACL> EVERYONE_READ_CREATOR_ALL =
- ImmutableList.<ACL>builder()
- .addAll(Ids.CREATOR_ALL_ACL)
- .addAll(Ids.READ_ACL_UNSAFE)
- .build();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
deleted file mode 100644
index 34cfb65..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogManager.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.callback.LogSegmentListener;
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.namespace.NamespaceDriver;
-import com.twitter.distributedlog.subscription.SubscriptionStateStore;
-import com.twitter.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A DistributedLogManager is responsible for managing a single place of storing
- * edit logs. It may correspond to multiple files, a backup node, etc.
- * Even when the actual underlying storage is rolled, or failed and restored,
- * each conceptual place of storage corresponds to exactly one instance of
- * this class, which is created when the EditLog is first opened.
- */
-public interface DistributedLogManager extends AsyncCloseable, Closeable {
-
- /**
- * Get the name of the stream managed by this log manager
- * @return streamName
- */
- public String getStreamName();
-
- /**
- * Get the namespace driver used by this manager.
- *
- * @return the namespace driver
- */
- public NamespaceDriver getNamespaceDriver();
-
- /**
- * Get log segments.
- *
- * @return log segments
- * @throws IOException
- */
- public List<LogSegmentMetadata> getLogSegments() throws IOException;
-
- /**
- * Register <i>listener</i> on log segment updates of this stream.
- *
- * @param listener
- * listener to receive update log segment list.
- */
- public void registerListener(LogSegmentListener listener) throws IOException ;
-
- /**
- * Unregister <i>listener</i> on log segment updates from this stream.
- *
- * @param listener
- * listener to receive update log segment list.
- */
- public void unregisterListener(LogSegmentListener listener);
-
- /**
- * Open async log writer to write records to the log stream.
- *
- * @return result represents the open result
- */
- public Future<AsyncLogWriter> openAsyncLogWriter();
-
- /**
- * Begin writing to the log stream identified by the name
- *
- * @return the writer interface to generate log records
- */
- public LogWriter startLogSegmentNonPartitioned() throws IOException;
-
- /**
- * Begin writing to the log stream identified by the name
- *
- * @return the writer interface to generate log records
- */
- // @Deprecated
- public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
-
- /**
- * Begin appending to the end of the log stream which is being treated as a sequence of bytes
- *
- * @return the writer interface to generate log records
- */
- public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
-
- /**
- * Get a reader to read a log stream as a sequence of bytes
- *
- * @return the writer interface to generate log records
- */
- public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
-
- /**
- * Get the input stream starting with fromTxnId for the specified log
- *
- * @param fromTxnId - the first transaction id we want to read
- * @return the stream starting with transaction fromTxnId
- * @throws IOException if a stream cannot be found.
- */
- public LogReader getInputStream(long fromTxnId)
- throws IOException;
-
- public LogReader getInputStream(DLSN fromDLSN) throws IOException;
-
- /**
- * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
- *
- * @param fromTxnId
- * transaction id to start reading from
- * @return async log reader
- */
- public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
-
- /**
- * Open an async log reader to read records from a log starting from <code>fromDLSN</code>
- *
- * @param fromDLSN
- * dlsn to start reading from
- * @return async log reader
- */
- public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
-
- // @Deprecated
- public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
-
- // @Deprecated
- public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
-
- public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
-
- /**
- * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
- * If two readers tried to open using same subscriberId, one would succeed, while the other
- * will be blocked until it gets the lock.
- *
- * @param fromDLSN
- * start dlsn
- * @param subscriberId
- * subscriber id
- * @return async log reader
- */
- public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
-
- /**
- * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
- * its last commit position recorded in subscription store. If no last commit position found
- * in subscription store, it would start reading from head of the stream.
- *
- * If the two readers tried to open using same subscriberId, one would succeed, while the other
- * will be blocked until it gets the lock.
- *
- * @param subscriberId
- * subscriber id
- * @return async log reader
- */
- public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
-
- /**
- * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
- *
- * @param transactionId
- * transaction id
- * @return dlsn of first log record whose transaction id is not less than transactionId.
- */
- public Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
-
- /**
- * Get the last log record in the stream
- *
- * @return the last log record in the stream
- * @throws IOException if a stream cannot be found.
- */
- public LogRecordWithDLSN getLastLogRecord()
- throws IOException;
-
- /**
- * Get the earliest Transaction Id available in the log
- *
- * @return earliest transaction id
- * @throws IOException
- */
- public long getFirstTxId() throws IOException;
-
- /**
- * Get Latest Transaction Id in the log
- *
- * @return latest transaction id
- * @throws IOException
- */
- public long getLastTxId() throws IOException;
-
- /**
- * Get Latest DLSN in the log
- *
- * @return last dlsn
- * @throws IOException
- */
- public DLSN getLastDLSN() throws IOException;
-
- /**
- * Get Latest log record with DLSN in the log - async
- *
- * @return latest log record with DLSN
- */
- public Future<LogRecordWithDLSN> getLastLogRecordAsync();
-
- /**
- * Get Latest Transaction Id in the log - async
- *
- * @return latest transaction id
- */
- public Future<Long> getLastTxIdAsync();
-
- /**
- * Get first DLSN in the log.
- *
- * @return first dlsn in the stream
- */
- public Future<DLSN> getFirstDLSNAsync();
-
- /**
- * Get Latest DLSN in the log - async
- *
- * @return latest transaction id
- */
- public Future<DLSN> getLastDLSNAsync();
-
- /**
- * Get the number of log records in the active portion of the log
- * Any log segments that have already been truncated will not be included
- *
- * @return number of log records
- * @throws IOException
- */
- public long getLogRecordCount() throws IOException;
-
- /**
- * Get the number of log records in the active portion of the log - async.
- * Any log segments that have already been truncated will not be included
- *
- * @return future number of log records
- * @throws IOException
- */
- public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
-
- /**
- * Run recovery on the log.
- *
- * @throws IOException
- */
- public void recover() throws IOException;
-
- /**
- * Check if an end of stream marker was added to the stream
- * A stream with an end of stream marker cannot be appended to
- *
- * @return true if the marker was added to the stream, false otherwise
- * @throws IOException
- */
- public boolean isEndOfStreamMarked() throws IOException;
-
- /**
- * Delete the log.
- *
- * @throws IOException if the deletion fails
- */
- public void delete() throws IOException;
-
- /**
- * The DistributedLogManager may archive/purge any logs for transactionId
- * less than or equal to minImageTxId.
- * This is to be used only when the client explicitly manages deletion. If
- * the cleanup policy is based on sliding time window, then this method need
- * not be called.
- *
- * @param minTxIdToKeep the earliest txid that must be retained
- * @throws IOException if purging fails
- */
- public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
-
- /**
- * Get the subscriptions store provided by the distributedlog manager.
- *
- * @return subscriptions store manages subscriptions for current stream.
- */
- public SubscriptionsStore getSubscriptionsStore();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
deleted file mode 100644
index bf315fc..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java
+++ /dev/null
@@ -1,403 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import javax.annotation.Nullable;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * A set of {@link LogRecord}s.
- */
-public class Entry {
-
- /**
- * Create a new log record set.
- *
- * @param logName
- * name of the log
- * @param initialBufferSize
- * initial buffer size
- * @param envelopeBeforeTransmit
- * if envelope the buffer before transmit
- * @param codec
- * compression codec
- * @param statsLogger
- * stats logger to receive stats
- * @return writer to build a log record set.
- */
- public static Writer newEntry(
- String logName,
- int initialBufferSize,
- boolean envelopeBeforeTransmit,
- CompressionCodec.Type codec,
- StatsLogger statsLogger) {
- return new EnvelopedEntryWriter(
- logName,
- initialBufferSize,
- envelopeBeforeTransmit,
- codec,
- statsLogger);
- }
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * Build the record set object.
- */
- public static class Builder {
-
- private long logSegmentSequenceNumber = -1;
- private long entryId = -1;
- private long startSequenceId = Long.MIN_VALUE;
- private boolean envelopeEntry = true;
- // input stream
- private InputStream in = null;
- // or bytes array
- private byte[] data = null;
- private int offset = -1;
- private int length = -1;
- private Optional<Long> txidToSkipTo = Optional.absent();
- private Optional<DLSN> dlsnToSkipTo = Optional.absent();
- private boolean deserializeRecordSet = true;
-
- private Builder() {}
-
- /**
- * Reset the builder.
- *
- * @return builder
- */
- public Builder reset() {
- logSegmentSequenceNumber = -1;
- entryId = -1;
- startSequenceId = Long.MIN_VALUE;
- envelopeEntry = true;
- // input stream
- in = null;
- // or bytes array
- data = null;
- offset = -1;
- length = -1;
- txidToSkipTo = Optional.absent();
- dlsnToSkipTo = Optional.absent();
- return this;
- }
-
- /**
- * Set the segment info of the log segment that this record
- * set belongs to.
- *
- * @param lssn
- * log segment sequence number
- * @param startSequenceId
- * start sequence id of this log segment
- * @return builder
- */
- public Builder setLogSegmentInfo(long lssn, long startSequenceId) {
- this.logSegmentSequenceNumber = lssn;
- this.startSequenceId = startSequenceId;
- return this;
- }
-
- /**
- * Set the entry id of this log record set.
- *
- * @param entryId
- * entry id assigned for this log record set.
- * @return builder
- */
- public Builder setEntryId(long entryId) {
- this.entryId = entryId;
- return this;
- }
-
- /**
- * Set whether this record set is enveloped or not.
- *
- * @param enabled
- * flag indicates whether this record set is enveloped or not.
- * @return builder
- */
- public Builder setEnvelopeEntry(boolean enabled) {
- this.envelopeEntry = enabled;
- return this;
- }
-
- /**
- * Set the serialized bytes data of this record set.
- *
- * @param data
- * serialized bytes data of this record set.
- * @param offset
- * offset of the bytes data
- * @param length
- * length of the bytes data
- * @return builder
- */
- public Builder setData(byte[] data, int offset, int length) {
- this.data = data;
- this.offset = offset;
- this.length = length;
- return this;
- }
-
- /**
- * Set the input stream of the serialized bytes data of this record set.
- *
- * @param in
- * input stream
- * @return builder
- */
- public Builder setInputStream(InputStream in) {
- this.in = in;
- return this;
- }
-
- /**
- * Set the record set starts from <code>dlsn</code>.
- *
- * @param dlsn
- * dlsn to skip to
- * @return builder
- */
- public Builder skipTo(@Nullable DLSN dlsn) {
- this.dlsnToSkipTo = Optional.fromNullable(dlsn);
- return this;
- }
-
- /**
- * Set the record set starts from <code>txid</code>.
- *
- * @param txid
- * txid to skip to
- * @return builder
- */
- public Builder skipTo(long txid) {
- this.txidToSkipTo = Optional.of(txid);
- return this;
- }
-
- /**
- * Enable/disable deserialize record set.
- *
- * @param enabled
- * flag to enable/disable dserialize record set.
- * @return builder
- */
- public Builder deserializeRecordSet(boolean enabled) {
- this.deserializeRecordSet = enabled;
- return this;
- }
-
- public Entry build() {
- Preconditions.checkNotNull(data, "Serialized data isn't provided");
- Preconditions.checkArgument(offset >= 0 && length >= 0
- && (offset + length) <= data.length,
- "Invalid offset or length of serialized data");
- return new Entry(
- logSegmentSequenceNumber,
- entryId,
- startSequenceId,
- envelopeEntry,
- deserializeRecordSet,
- data,
- offset,
- length,
- txidToSkipTo,
- dlsnToSkipTo);
- }
-
- public Entry.Reader buildReader() throws IOException {
- Preconditions.checkArgument(data != null || in != null,
- "Serialized data or input stream isn't provided");
- InputStream in;
- if (null != this.in) {
- in = this.in;
- } else {
- Preconditions.checkArgument(offset >= 0 && length >= 0
- && (offset + length) <= data.length,
- "Invalid offset or length of serialized data");
- in = new ByteArrayInputStream(data, offset, length);
- }
- return new EnvelopedEntryReader(
- logSegmentSequenceNumber,
- entryId,
- startSequenceId,
- in,
- envelopeEntry,
- deserializeRecordSet,
- NullStatsLogger.INSTANCE);
- }
-
- }
-
- private final long logSegmentSequenceNumber;
- private final long entryId;
- private final long startSequenceId;
- private final boolean envelopedEntry;
- private final boolean deserializeRecordSet;
- private final byte[] data;
- private final int offset;
- private final int length;
- private final Optional<Long> txidToSkipTo;
- private final Optional<DLSN> dlsnToSkipTo;
-
- private Entry(long logSegmentSequenceNumber,
- long entryId,
- long startSequenceId,
- boolean envelopedEntry,
- boolean deserializeRecordSet,
- byte[] data,
- int offset,
- int length,
- Optional<Long> txidToSkipTo,
- Optional<DLSN> dlsnToSkipTo) {
- this.logSegmentSequenceNumber = logSegmentSequenceNumber;
- this.entryId = entryId;
- this.startSequenceId = startSequenceId;
- this.envelopedEntry = envelopedEntry;
- this.deserializeRecordSet = deserializeRecordSet;
- this.data = data;
- this.offset = offset;
- this.length = length;
- this.txidToSkipTo = txidToSkipTo;
- this.dlsnToSkipTo = dlsnToSkipTo;
- }
-
- /**
- * Get raw data of this record set.
- *
- * @return raw data representation of this record set.
- */
- public byte[] getRawData() {
- return data;
- }
-
- /**
- * Create reader to iterate over this record set.
- *
- * @return reader to iterate over this record set.
- * @throws IOException if the record set is invalid record set.
- */
- public Reader reader() throws IOException {
- InputStream in = new ByteArrayInputStream(data, offset, length);
- Reader reader = new EnvelopedEntryReader(
- logSegmentSequenceNumber,
- entryId,
- startSequenceId,
- in,
- envelopedEntry,
- deserializeRecordSet,
- NullStatsLogger.INSTANCE);
- if (txidToSkipTo.isPresent()) {
- reader.skipTo(txidToSkipTo.get());
- }
- if (dlsnToSkipTo.isPresent()) {
- reader.skipTo(dlsnToSkipTo.get());
- }
- return reader;
- }
-
- /**
- * Writer to append {@link LogRecord}s to {@link Entry}.
- */
- public interface Writer extends EntryBuffer {
-
- /**
- * Write a {@link LogRecord} to this record set.
- *
- * @param record
- * record to write
- * @param transmitPromise
- * callback for transmit result. the promise is only
- * satisfied when this record set is transmitted.
- * @throws LogRecordTooLongException if the record is too long
- * @throws WriteException when encountered exception writing the record
- */
- void writeRecord(LogRecord record, Promise<DLSN> transmitPromise)
- throws LogRecordTooLongException, WriteException;
-
- /**
- * Reset the writer to write records.
- */
- void reset();
-
- }
-
- /**
- * Reader to read {@link LogRecord}s from this record set.
- */
- public interface Reader {
-
- /**
- * Get the log segment sequence number.
- *
- * @return the log segment sequence number.
- */
- long getLSSN();
-
- /**
- * Return the entry id.
- *
- * @return the entry id.
- */
- long getEntryId();
-
- /**
- * Read next log record from this record set.
- *
- * @return next log record from this record set.
- */
- LogRecordWithDLSN nextRecord() throws IOException;
-
- /**
- * Skip the reader to the record whose transaction id is <code>txId</code>.
- *
- * @param txId
- * transaction id to skip to.
- * @return true if skip succeeds, otherwise false.
- * @throws IOException
- */
- boolean skipTo(long txId) throws IOException;
-
- /**
- * Skip the reader to the record whose DLSN is <code>dlsn</code>.
- *
- * @param dlsn
- * DLSN to skip to.
- * @return true if skip succeeds, otherwise false.
- * @throws IOException
- */
- boolean skipTo(DLSN dlsn) throws IOException;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java
deleted file mode 100644
index 394fbad..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryBuffer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import com.twitter.distributedlog.io.Buffer;
-import com.twitter.distributedlog.io.TransmitListener;
-
-import java.io.IOException;
-
-/**
- * Write representation of a {@link Entry}.
- * It is a buffer of log record set, used for transmission.
- */
-public interface EntryBuffer extends TransmitListener {
-
- /**
- * Return if this record set contains user records.
- *
- * @return true if this record set contains user records, otherwise
- * return false.
- */
- boolean hasUserRecords();
-
- /**
- * Return number of records in current record set.
- *
- * @return number of records in current record set.
- */
- int getNumRecords();
-
- /**
- * Return number of bytes in current record set.
- *
- * @return number of bytes in current record set.
- */
- int getNumBytes();
-
- /**
- * Return max tx id in current record set.
- *
- * @return max tx id.
- */
- long getMaxTxId();
-
- /**
- * Get the buffer to transmit.
- *
- * @return the buffer to transmit.
- * @throws InvalidEnvelopedEntryException if the record set buffer is invalid
- * @throws IOException when encountered IOException during serialization
- */
- Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
deleted file mode 100644
index 0a15d29..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-/**
- * The position of an entry, identified by log segment sequence number and entry id.
- */
-class EntryPosition {
-
- private long lssn;
- private long entryId;
-
- EntryPosition(long lssn, long entryId) {
- this.lssn = lssn;
- this.entryId = entryId;
- }
-
- public synchronized long getLogSegmentSequenceNumber() {
- return lssn;
- }
-
- public synchronized long getEntryId() {
- return entryId;
- }
-
- public synchronized boolean advance(long lssn, long entryId) {
- if (lssn == this.lssn) {
- if (entryId <= this.entryId) {
- return false;
- }
- this.entryId = entryId;
- return true;
- } else if (lssn > this.lssn) {
- this.lssn = lssn;
- this.entryId = entryId;
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("(").append(lssn).append(", ").append(entryId).append(")");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java
deleted file mode 100644
index 55d3be9..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntry.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Preconditions;
-
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.io.CompressionUtils;
-import com.twitter.distributedlog.util.BitMaskUtils;
-
-/**
- * An enveloped entry written to BookKeeper.
- *
- * Data type in brackets. Interpretation should be on the basis of data types and not individual
- * bytes to honor Endianness.
- *
- * Entry Structure:
- * ---------------
- * Bytes 0 : Version (Byte)
- * Bytes 1 - (DATA = 1+Header.length-1) : Header (Integer)
- * Bytes DATA - DATA+3 : Payload Length (Integer)
- * BYTES DATA+4 - DATA+4+payload.length-1 : Payload (Byte[])
- *
- * V1 Header Structure: // Offsets relative to the start of the header.
- * -------------------
- * Bytes 0 - 3 : Flags (Integer)
- * Bytes 4 - 7 : Original payload size before compression (Integer)
- *
- * Flags: // 32 Bits
- * -----
- * 0 ... 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
- * |_|
- * |
- * Compression Type
- *
- * Compression Type: // 2 Bits (Least significant)
- * ----------------
- * 00 : No Compression
- * 01 : LZ4 Compression
- * 10 : Unused
- * 11 : Unused
- */
-public class EnvelopedEntry {
-
- public static final int VERSION_LENGTH = 1; // One byte long
- public static final byte VERSION_ONE = 1;
-
- public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE;
- public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE;
- public static final byte CURRENT_VERSION = VERSION_ONE;
-
- private final OpStatsLogger compressionStat;
- private final OpStatsLogger decompressionStat;
- private final Counter compressedEntryBytes;
- private final Counter decompressedEntryBytes;
- private final byte version;
-
- private Header header = new Header();
- private Payload payloadCompressed = new Payload();
- private Payload payloadDecompressed = new Payload();
-
- public EnvelopedEntry(byte version,
- StatsLogger statsLogger) throws InvalidEnvelopedEntryException {
- Preconditions.checkNotNull(statsLogger);
- if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) {
- throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ "
- + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]");
- }
- this.version = version;
- this.compressionStat = statsLogger.getOpStatsLogger("compression_time");
- this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time");
- this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes");
- this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes");
- }
-
- /**
- * @param statsLogger
- * Used for getting stats for (de)compression time
- * @param compressionType
- * The compression type to use
- * @param decompressed
- * The decompressed payload
- * NOTE: The size of the byte array passed as the decompressed payload can be larger
- * than the actual contents to be compressed.
- */
- public EnvelopedEntry(byte version,
- CompressionCodec.Type compressionType,
- byte[] decompressed,
- int length,
- StatsLogger statsLogger)
- throws InvalidEnvelopedEntryException {
- this(version, statsLogger);
- Preconditions.checkNotNull(compressionType);
- Preconditions.checkNotNull(decompressed);
- Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length);
-
- this.header = new Header(compressionType, length);
- this.payloadDecompressed = new Payload(length, decompressed);
- }
-
- private boolean isReady() {
- return (header.ready && payloadDecompressed.ready);
- }
-
- @Compression
- public void writeFully(DataOutputStream out) throws IOException {
- Preconditions.checkNotNull(out);
- if (!isReady()) {
- throw new IOException("Entry not writable");
- }
- // Version
- out.writeByte(version);
- // Header
- header.write(out);
- // Compress
- CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
- byte[] compressed = codec.compress(
- payloadDecompressed.payload,
- 0,
- payloadDecompressed.length,
- compressionStat);
- this.payloadCompressed = new Payload(compressed.length, compressed);
- this.compressedEntryBytes.add(payloadCompressed.length);
- this.decompressedEntryBytes.add(payloadDecompressed.length);
- payloadCompressed.write(out);
- }
-
- @Compression
- public void readFully(DataInputStream in) throws IOException {
- Preconditions.checkNotNull(in);
- // Make sure we're reading the right versioned entry.
- byte version = in.readByte();
- if (version != this.version) {
- throw new IOException(String.format("Version mismatch while reading. Received: %d," +
- " Required: %d", version, this.version));
- }
- header.read(in);
- payloadCompressed.read(in);
- // Decompress
- CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
- byte[] decompressed = codec.decompress(
- payloadCompressed.payload,
- 0,
- payloadCompressed.length,
- header.decompressedSize,
- decompressionStat);
- this.payloadDecompressed = new Payload(decompressed.length, decompressed);
- this.compressedEntryBytes.add(payloadCompressed.length);
- this.decompressedEntryBytes.add(payloadDecompressed.length);
- }
-
- public byte[] getDecompressedPayload() throws IOException {
- if (!isReady()) {
- throw new IOException("Decompressed payload is not initialized");
- }
- return payloadDecompressed.payload;
- }
-
- public static class Header {
- public static final int COMPRESSION_CODEC_MASK = 0x3;
- public static final int COMPRESSION_CODEC_NONE = 0x0;
- public static final int COMPRESSION_CODEC_LZ4 = 0x1;
-
- private int flags = 0;
- private int decompressedSize = 0;
- private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN;
-
- // Whether this struct is ready for reading/writing.
- private boolean ready = false;
-
- // Used while reading.
- public Header() {
- }
-
- public Header(CompressionCodec.Type compressionType,
- int decompressedSize) {
- this.compressionType = compressionType;
- this.decompressedSize = decompressedSize;
- this.flags = 0;
- switch (compressionType) {
- case NONE:
- this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
- COMPRESSION_CODEC_NONE);
- break;
- case LZ4:
- this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
- COMPRESSION_CODEC_LZ4);
- break;
- default:
- throw new RuntimeException(String.format("Unknown Compression Type: %s",
- compressionType));
- }
- // This can now be written.
- this.ready = true;
- }
-
- private void write(DataOutputStream out) throws IOException {
- out.writeInt(flags);
- out.writeInt(decompressedSize);
- }
-
- private void read(DataInputStream in) throws IOException {
- this.flags = in.readInt();
- int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK);
- if (compressionType == COMPRESSION_CODEC_NONE) {
- this.compressionType = CompressionCodec.Type.NONE;
- } else if (compressionType == COMPRESSION_CODEC_LZ4) {
- this.compressionType = CompressionCodec.Type.LZ4;
- } else {
- throw new IOException(String.format("Unsupported Compression Type: %s",
- compressionType));
- }
- this.decompressedSize = in.readInt();
- // Values can now be read.
- this.ready = true;
- }
- }
-
- public static class Payload {
- private int length = 0;
- private byte[] payload = null;
-
- // Whether this struct is ready for reading/writing.
- private boolean ready = false;
-
- // Used for reading
- Payload() {
- }
-
- Payload(int length, byte[] payload) {
- this.length = length;
- this.payload = payload;
- this.ready = true;
- }
-
- private void write(DataOutputStream out) throws IOException {
- out.writeInt(length);
- out.write(payload, 0, length);
- }
-
- private void read(DataInputStream in) throws IOException {
- this.length = in.readInt();
- this.payload = new byte[length];
- in.readFully(payload);
- this.ready = true;
- }
- }
-
- /**
- * Return an InputStream that reads from the provided InputStream, decompresses the data
- * and returns a new InputStream wrapping the underlying payload.
- *
- * Note that src is modified by this call.
- *
- * @return
- * New Input stream with the underlying payload.
- * @throws Exception
- */
- public static InputStream fromInputStream(InputStream src,
- StatsLogger statsLogger) throws IOException {
- src.mark(VERSION_LENGTH);
- byte version = new DataInputStream(src).readByte();
- src.reset();
- EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger);
- entry.readFully(new DataInputStream(src));
- return new ByteArrayInputStream(entry.getDecompressedPayload());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
deleted file mode 100644
index 038bb18..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Record reader to read records from an enveloped entry buffer.
- */
-class EnvelopedEntryReader implements Entry.Reader, RecordStream {
-
- private final long logSegmentSeqNo;
- private final long entryId;
- private final LogRecord.Reader reader;
-
- // slot id
- private long slotId = 0;
-
- EnvelopedEntryReader(long logSegmentSeqNo,
- long entryId,
- long startSequenceId,
- InputStream in,
- boolean envelopedEntry,
- boolean deserializeRecordSet,
- StatsLogger statsLogger)
- throws IOException {
- this.logSegmentSeqNo = logSegmentSeqNo;
- this.entryId = entryId;
- InputStream src = in;
- if (envelopedEntry) {
- src = EnvelopedEntry.fromInputStream(in, statsLogger);
- }
- this.reader = new LogRecord.Reader(
- this,
- new DataInputStream(src),
- startSequenceId,
- deserializeRecordSet);
- }
-
- @Override
- public long getLSSN() {
- return logSegmentSeqNo;
- }
-
- @Override
- public long getEntryId() {
- return entryId;
- }
-
- @Override
- public LogRecordWithDLSN nextRecord() throws IOException {
- return reader.readOp();
- }
-
- @Override
- public boolean skipTo(long txId) throws IOException {
- return reader.skipTo(txId, true);
- }
-
- @Override
- public boolean skipTo(DLSN dlsn) throws IOException {
- return reader.skipTo(dlsn);
- }
-
- //
- // Record Stream
- //
-
- @Override
- public void advance(int numRecords) {
- slotId += numRecords;
- }
-
- @Override
- public DLSN getCurrentPosition() {
- return new DLSN(logSegmentSeqNo, entryId, slotId);
- }
-
- @Override
- public String getName() {
- return "EnvelopedReader";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java
deleted file mode 100644
index 01a91ab..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryWriter.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.Entry.Writer;
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.WriteCancelledException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.io.Buffer;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-
-/**
- * {@link com.twitter.distributedlog.io.Buffer} based log record set writer.
- */
-class EnvelopedEntryWriter implements Writer {
-
- static final Logger logger = LoggerFactory.getLogger(EnvelopedEntryWriter.class);
-
- private static class WriteRequest {
-
- private final int numRecords;
- private final Promise<DLSN> promise;
-
- WriteRequest(int numRecords, Promise<DLSN> promise) {
- this.numRecords = numRecords;
- this.promise = promise;
- }
-
- }
-
- private final String logName;
- private final Buffer buffer;
- private final LogRecord.Writer writer;
- private final List<WriteRequest> writeRequests;
- private final boolean envelopeBeforeTransmit;
- private final CompressionCodec.Type codec;
- private final StatsLogger statsLogger;
- private int count = 0;
- private boolean hasUserData = false;
- private long maxTxId = Long.MIN_VALUE;
-
- EnvelopedEntryWriter(String logName,
- int initialBufferSize,
- boolean envelopeBeforeTransmit,
- CompressionCodec.Type codec,
- StatsLogger statsLogger) {
- this.logName = logName;
- this.buffer = new Buffer(initialBufferSize * 6 / 5);
- this.writer = new LogRecord.Writer(new DataOutputStream(buffer));
- this.writeRequests = new LinkedList<WriteRequest>();
- this.envelopeBeforeTransmit = envelopeBeforeTransmit;
- this.codec = codec;
- this.statsLogger = statsLogger;
- }
-
- @Override
- public synchronized void reset() {
- cancelPromises(new WriteCancelledException(logName, "Record Set is reset"));
- count = 0;
- this.buffer.reset();
- }
-
- @Override
- public synchronized void writeRecord(LogRecord record,
- Promise<DLSN> transmitPromise)
- throws LogRecordTooLongException, WriteException {
- int logRecordSize = record.getPersistentSize();
- if (logRecordSize > MAX_LOGRECORD_SIZE) {
- throw new LogRecordTooLongException(
- "Log Record of size " + logRecordSize + " written when only "
- + MAX_LOGRECORD_SIZE + " is allowed");
- }
-
- try {
- this.writer.writeOp(record);
- int numRecords = 1;
- if (!record.isControl()) {
- hasUserData = true;
- }
- if (record.isRecordSet()) {
- numRecords = LogRecordSet.numRecords(record);
- }
- count += numRecords;
- writeRequests.add(new WriteRequest(numRecords, transmitPromise));
- maxTxId = Math.max(maxTxId, record.getTransactionId());
- } catch (IOException e) {
- logger.error("Failed to append record to record set of {} : ",
- logName, e);
- throw new WriteException(logName, "Failed to append record to record set of "
- + logName);
- }
- }
-
- private synchronized void satisfyPromises(long lssn, long entryId) {
- long nextSlotId = 0;
- for (WriteRequest request : writeRequests) {
- request.promise.setValue(new DLSN(lssn, entryId, nextSlotId));
- nextSlotId += request.numRecords;
- }
- writeRequests.clear();
- }
-
- private synchronized void cancelPromises(Throwable reason) {
- for (WriteRequest request : writeRequests) {
- request.promise.setException(reason);
- }
- writeRequests.clear();
- }
-
- @Override
- public synchronized long getMaxTxId() {
- return maxTxId;
- }
-
- @Override
- public synchronized boolean hasUserRecords() {
- return hasUserData;
- }
-
- @Override
- public int getNumBytes() {
- return buffer.size();
- }
-
- @Override
- public synchronized int getNumRecords() {
- return count;
- }
-
- @Override
- public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException {
- if (!envelopeBeforeTransmit) {
- return buffer;
- }
- // We can't escape this allocation because things need to be read from one byte array
- // and then written to another. This is the destination.
- Buffer toSend = new Buffer(buffer.size());
- byte[] decompressed = buffer.getData();
- int length = buffer.size();
- EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
- codec,
- decompressed,
- length,
- statsLogger);
- // This will cause an allocation of a byte[] for compression. This can be avoided
- // but we can do that later only if needed.
- entry.writeFully(new DataOutputStream(toSend));
- return toSend;
- }
-
- @Override
- public synchronized DLSN finalizeTransmit(long lssn, long entryId) {
- return new DLSN(lssn, entryId, count - 1);
- }
-
- @Override
- public void completeTransmit(long lssn, long entryId) {
- satisfyPromises(lssn, entryId);
- }
-
- @Override
- public void abortTransmit(Throwable reason) {
- cancelPromises(reason);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java
deleted file mode 100644
index 550d314..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LedgerReadPosition.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LedgerReadPosition {
- static final Logger LOG = LoggerFactory.getLogger(LedgerReadPosition.class);
-
- private static enum PartialOrderingComparisonResult {
- NotComparable,
- GreaterThan,
- LessThan,
- EqualTo
- }
-
- long ledgerId = DistributedLogConstants.UNRESOLVED_LEDGER_ID;
- long logSegmentSequenceNo;
- long entryId;
-
- public LedgerReadPosition(long ledgerId, long logSegmentSequenceNo, long entryId) {
- this.ledgerId = ledgerId;
- this.logSegmentSequenceNo = logSegmentSequenceNo;
- this.entryId = entryId;
- }
-
- public LedgerReadPosition(LedgerReadPosition that) {
- this.ledgerId = that.ledgerId;
- this.logSegmentSequenceNo = that.logSegmentSequenceNo;
- this.entryId = that.entryId;
- }
-
-
- public LedgerReadPosition(final DLSN dlsn) {
- this(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId());
- }
-
- public LedgerReadPosition(long logSegmentSequenceNo, long entryId) {
- this.logSegmentSequenceNo = logSegmentSequenceNo;
- this.entryId = entryId;
- }
-
- public long getLedgerId() {
- if (DistributedLogConstants.UNRESOLVED_LEDGER_ID == ledgerId) {
- LOG.trace("Ledger Id is not initialized");
- throw new IllegalStateException("Ledger Id is not initialized");
- }
- return ledgerId;
- }
-
- public long getLogSegmentSequenceNumber() {
- return logSegmentSequenceNo;
- }
-
- public long getEntryId() {
- return entryId;
- }
-
- public void advance() {
- entryId++;
- }
-
- public void positionOnNewLogSegment(long ledgerId, long logSegmentSequenceNo) {
- this.ledgerId = ledgerId;
- this.logSegmentSequenceNo = logSegmentSequenceNo;
- this.entryId = 0L;
- }
-
- @Override
- public String toString() {
- return String.format("(lid=%d, lseqNo=%d, eid=%d)", ledgerId, logSegmentSequenceNo, entryId);
- }
-
- public boolean definitelyLessThanOrEqualTo(LedgerReadPosition threshold) {
- PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
- return ((result == PartialOrderingComparisonResult.LessThan) ||
- (result == PartialOrderingComparisonResult.EqualTo));
- }
-
- public boolean definitelyLessThan(LedgerReadPosition threshold) {
- PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
- return result == PartialOrderingComparisonResult.LessThan;
- }
-
- private PartialOrderingComparisonResult comparePartiallyOrdered(LedgerReadPosition threshold) {
- // If no threshold is passed we cannot make a definitive comparison
- if (null == threshold) {
- return PartialOrderingComparisonResult.NotComparable;
- }
-
- if (this.logSegmentSequenceNo != threshold.logSegmentSequenceNo) {
- if (this.logSegmentSequenceNo < threshold.logSegmentSequenceNo) {
- return PartialOrderingComparisonResult.LessThan;
- } else {
- return PartialOrderingComparisonResult.GreaterThan;
- }
- } else if (this.ledgerId != threshold.ledgerId) {
- // When logSegmentSequenceNo is equal we cannot definitely say that this
- // position is less than the threshold unless ledgerIds are equal
- // since LogSegmentSequenceNumber maybe inferred from transactionIds in older
- // versions of the metadata.
- return PartialOrderingComparisonResult.NotComparable;
- } else if (this.getEntryId() < threshold.getEntryId()) {
- return PartialOrderingComparisonResult.LessThan;
- } else if (this.getEntryId() > threshold.getEntryId()) {
- return PartialOrderingComparisonResult.GreaterThan;
- } else {
- return PartialOrderingComparisonResult.EqualTo;
- }
- }
-
- /**
- * Comparator for the key portion
- */
- public static final ReadAheadCacheKeyComparator COMPARATOR = new ReadAheadCacheKeyComparator();
-
- // Only compares the key portion
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof LedgerReadPosition)) {
- return false;
- }
- LedgerReadPosition key = (LedgerReadPosition) other;
- return ledgerId == key.ledgerId &&
- entryId == key.entryId;
- }
-
- @Override
- public int hashCode() {
- return (int) (ledgerId * 13 ^ entryId * 17);
- }
-
- /**
- * Compare EntryKey.
- */
- protected static class ReadAheadCacheKeyComparator implements Comparator<LedgerReadPosition>, Serializable {
-
- private static final long serialVersionUID = 0L;
-
- @Override
- public int compare(LedgerReadPosition left, LedgerReadPosition right) {
- long ret = left.ledgerId - right.ledgerId;
- if (ret == 0) {
- ret = left.entryId - right.entryId;
- }
- return (ret < 0) ? -1 : ((ret > 0) ? 1 : 0);
- }
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
deleted file mode 100644
index f4a1e41..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LocalDLMEmulator.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.google.common.base.Optional;
-import com.twitter.distributedlog.impl.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
-import org.apache.bookkeeper.util.IOUtils;
-import org.apache.bookkeeper.util.LocalBookKeeper;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utility class for setting up bookkeeper ensembles
- * and bringing individual bookies up and down
- */
-public class LocalDLMEmulator {
- private static final Logger LOG = LoggerFactory.getLogger(LocalDLMEmulator.class);
-
- public static final String DLOG_NAMESPACE = "/messaging/distributedlog";
-
- private static final int DEFAULT_BOOKIE_INITIAL_PORT = 0; // Use ephemeral ports
- private static final int DEFAULT_ZK_TIMEOUT_SEC = 10;
- private static final int DEFAULT_ZK_PORT = 2181;
- private static final String DEFAULT_ZK_HOST = "127.0.0.1";
- private static final String DEFAULT_ZK_ENSEMBLE = DEFAULT_ZK_HOST + ":" + DEFAULT_ZK_PORT;
- private static final int DEFAULT_NUM_BOOKIES = 3;
- private static final ServerConfiguration DEFAULT_SERVER_CONFIGURATION = new ServerConfiguration();
-
- private final String zkEnsemble;
- private final URI uri;
- private final List<File> tmpDirs = new ArrayList<File>();
- private final int zkTimeoutSec;
- private final Thread bkStartupThread;
- private final String zkHost;
- private final int zkPort;
- private final int numBookies;
-
- public static class Builder {
- private int zkTimeoutSec = DEFAULT_ZK_TIMEOUT_SEC;
- private int numBookies = DEFAULT_NUM_BOOKIES;
- private String zkHost = DEFAULT_ZK_HOST;
- private int zkPort = DEFAULT_ZK_PORT;
- private int initialBookiePort = DEFAULT_BOOKIE_INITIAL_PORT;
- private boolean shouldStartZK = true;
- private Optional<ServerConfiguration> serverConf = Optional.absent();
-
- public Builder numBookies(int numBookies) {
- this.numBookies = numBookies;
- return this;
- }
- public Builder zkHost(String zkHost) {
- this.zkHost = zkHost;
- return this;
- }
- public Builder zkPort(int zkPort) {
- this.zkPort = zkPort;
- return this;
- }
- public Builder zkTimeoutSec(int zkTimeoutSec) {
- this.zkTimeoutSec = zkTimeoutSec;
- return this;
- }
- public Builder initialBookiePort(int initialBookiePort) {
- this.initialBookiePort = initialBookiePort;
- return this;
- }
- public Builder shouldStartZK(boolean shouldStartZK) {
- this.shouldStartZK = shouldStartZK;
- return this;
- }
- public Builder serverConf(ServerConfiguration serverConf) {
- this.serverConf = Optional.of(serverConf);
- return this;
- }
-
- public LocalDLMEmulator build() throws Exception {
- ServerConfiguration conf = null;
- if (serverConf.isPresent()) {
- conf = serverConf.get();
- } else {
- conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone();
- conf.setZkTimeout(zkTimeoutSec * 1000);
- }
- ServerConfiguration newConf = new ServerConfiguration();
- newConf.loadConf(conf);
- newConf.setAllowLoopback(true);
-
- return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort,
- initialBookiePort, zkTimeoutSec, newConf);
- }
- }
-
- public static Builder newBuilder() {
- return new Builder();
- }
-
- private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception {
- this.numBookies = numBookies;
- this.zkHost = zkHost;
- this.zkPort = zkPort;
- this.zkEnsemble = zkHost + ":" + zkPort;
- this.uri = URI.create("distributedlog://" + zkEnsemble + DLOG_NAMESPACE);
- this.zkTimeoutSec = zkTimeoutSec;
- this.bkStartupThread = new Thread() {
- public void run() {
- try {
- LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback());
- LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf);
- LOG.info("{} bookies are started.");
- } catch (InterruptedException e) {
- // go away quietly
- } catch (Exception e) {
- LOG.error("Error starting local bk", e);
- }
- }
- };
- }
-
- public void start() throws Exception {
- bkStartupThread.start();
- if (!LocalBookKeeper.waitForServerUp(zkEnsemble, zkTimeoutSec*1000)) {
- throw new Exception("Error starting zookeeper/bookkeeper");
- }
- int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec);
- assert (numBookies == bookiesUp);
- // Provision "/messaging/distributedlog" namespace
- DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri);
- }
-
- public void teardown() throws Exception {
- if (bkStartupThread != null) {
- bkStartupThread.interrupt();
- bkStartupThread.join();
- }
- for (File dir : tmpDirs) {
- FileUtils.deleteDirectory(dir);
- }
- }
-
- public String getZkServers() {
- return zkEnsemble;
- }
-
- public URI getUri() {
- return uri;
- }
-
- public BookieServer newBookie() throws Exception {
- ServerConfiguration bookieConf = new ServerConfiguration();
- bookieConf.setZkTimeout(zkTimeoutSec * 1000);
- bookieConf.setBookiePort(0);
- bookieConf.setAllowLoopback(true);
- File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
- "test");
- if (!tmpdir.delete()) {
- LOG.debug("Fail to delete tmpdir " + tmpdir);
- }
- if (!tmpdir.mkdir()) {
- throw new IOException("Fail to create tmpdir " + tmpdir);
- }
- tmpDirs.add(tmpdir);
-
- bookieConf.setZkServers(zkEnsemble);
- bookieConf.setJournalDirName(tmpdir.getPath());
- bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()});
-
- BookieServer b = new BookieServer(bookieConf);
- b.start();
- for (int i = 0; i < 10 && !b.isRunning(); i++) {
- Thread.sleep(10000);
- }
- if (!b.isRunning()) {
- throw new IOException("Bookie would not start");
- }
- return b;
- }
-
- /**
- * Check that a number of bookies are available
- *
- * @param count number of bookies required
- * @param timeout number of seconds to wait for bookies to start
- * @throws java.io.IOException if bookies are not started by the time the timeout hits
- */
- public int checkBookiesUp(int count, int timeout) throws Exception {
- ZooKeeper zkc = connectZooKeeper(zkHost, zkPort, zkTimeoutSec);
- try {
- int mostRecentSize = 0;
- for (int i = 0; i < timeout; i++) {
- try {
- List<String> children = zkc.getChildren("/ledgers/available",
- false);
- children.remove("readonly");
- mostRecentSize = children.size();
- if ((mostRecentSize > count) || LOG.isDebugEnabled()) {
- LOG.info("Found " + mostRecentSize + " bookies up, "
- + "waiting for " + count);
- if ((mostRecentSize > count) || LOG.isTraceEnabled()) {
- for (String child : children) {
- LOG.info(" server: " + child);
- }
- }
- }
- if (mostRecentSize == count) {
- break;
- }
- } catch (KeeperException e) {
- // ignore
- }
- Thread.sleep(1000);
- }
- return mostRecentSize;
- } finally {
- zkc.close();
- }
- }
-
- public static String getBkLedgerPath() {
- return "/ledgers";
- }
-
- public static ZooKeeper connectZooKeeper(String zkHost, int zkPort)
- throws IOException, KeeperException, InterruptedException {
- return connectZooKeeper(zkHost, zkPort, DEFAULT_ZK_TIMEOUT_SEC);
- }
-
- public static ZooKeeper connectZooKeeper(String zkHost, int zkPort, int zkTimeoutSec)
- throws IOException, KeeperException, InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- final String zkHostPort = zkHost + ":" + zkPort;
-
- ZooKeeper zkc = new ZooKeeper(zkHostPort, zkTimeoutSec * 1000, new Watcher() {
- public void process(WatchedEvent event) {
- if (event.getState() == Event.KeeperState.SyncConnected) {
- latch.countDown();
- }
- }
- });
- if (!latch.await(zkTimeoutSec, TimeUnit.SECONDS)) {
- throw new IOException("Zookeeper took too long to connect");
- }
- return zkc;
- }
-
- public static URI createDLMURI(String path) throws Exception {
- return createDLMURI(DEFAULT_ZK_ENSEMBLE, path);
- }
-
- public static URI createDLMURI(String zkServers, String path) throws Exception {
- return URI.create("distributedlog://" + zkServers + DLOG_NAMESPACE + path);
- }
-
- /**
- * Try to start zookkeeper locally on any port.
- */
- public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(File zkDir) throws Exception {
- return runZookeeperOnAnyPort((int) (Math.random()*10000+7000), zkDir);
- }
-
- /**
- * Try to start zookkeeper locally on any port beginning with some base port.
- * Dump some socket info when bind fails.
- */
- public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(int basePort, File zkDir) throws Exception {
-
- final int MAX_RETRIES = 20;
- final int MIN_PORT = 1025;
- final int MAX_PORT = 65535;
- ZooKeeperServerShim zks = null;
- int zkPort = basePort;
- boolean success = false;
- int retries = 0;
-
- while (!success) {
- try {
- LOG.info("zk trying to bind to port " + zkPort);
- zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkDir);
- success = true;
- } catch (BindException be) {
- retries++;
- if (retries > MAX_RETRIES) {
- throw be;
- }
- zkPort++;
- if (zkPort > MAX_PORT) {
- zkPort = MIN_PORT;
- }
- }
- }
-
- return Pair.of(zks, zkPort);
- }
-
- public static void main(String[] args) throws Exception {
- try {
- if (args.length < 1) {
- System.out.println("Usage: LocalDLEmulator <zk_port>");
- System.exit(-1);
- }
-
- final int zkPort = Integer.parseInt(args[0]);
- final File zkDir = IOUtils.createTempDir("distrlog", "zookeeper");
- final LocalDLMEmulator localDlm = LocalDLMEmulator.newBuilder()
- .zkPort(zkPort)
- .build();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- try {
- localDlm.teardown();
- FileUtils.deleteDirectory(zkDir);
- System.out.println("ByeBye!");
- } catch (Exception e) {
- // do nothing
- }
- }
- });
- localDlm.start();
-
- System.out.println(String.format(
- "DistributedLog Sandbox is running now. You could access distributedlog://%s:%s",
- DEFAULT_ZK_HOST,
- zkPort));
- } catch (Exception ex) {
- System.out.println("Exception occurred running emulator " + ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java
deleted file mode 100644
index c12de29..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/LogReader.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import com.twitter.distributedlog.io.AsyncCloseable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
- *
- * <h3>Lifecycle of a Reader</h3>
- *
- * A reader is a <i>sequential</i> reader that read records from a DL log starting
- * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
- * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
- * <p>
- * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
- * to read records out the log from provided position.
- * <p>
- * Closing the reader (via {@link #close()} will release all the resources occupied
- * by this reader instance.
- * <p>
- * Exceptions could be thrown during reading records. Once the exception is thrown,
- * the reader is set to an error state and it isn't usable anymore. It is the application's
- * responsibility to handle the exceptions and re-create readers if necessary.
- * <p>
- * Example:
- * <pre>
- * DistributedLogManager dlm = ...;
- * long nextTxId = ...;
- * LogReader reader = dlm.getInputStream(nextTxId);
- *
- * while (true) { // keep reading & processing records
- * LogRecord record;
- * try {
- * record = reader.readNext(false);
- * nextTxId = record.getTransactionId();
- * // process the record
- * ...
- * } catch (IOException ioe) {
- * // handle the exception
- * ...
- * reader = dlm.getInputStream(nextTxId + 1);
- * }
- * }
- *
- * </pre>
- *
- * <h3>Read Records</h3>
- *
- * Reading records from an <i>endless</i> log in `synchronous` way isn't as
- * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
- * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
- * controlling the <i>waiting</i> behavior on `synchronous` reads.
- *
- * <h4>Blocking vs NonBlocking</h4>
- *
- * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
- * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
- * means the reads will only check readahead cache and return whatever records
- * available in the readahead cache.
- * <p>
- * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
- * catching up with writer (there are records in the log), the read call will
- * wait until records are read and returned. If the reader is caught up with
- * writer (there are no more records in the log at read time), the read call
- * will wait for a small period of time (defined in
- * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
- * records available in the readahead cache. In other words, if a reader sees
- * no record on blocking reads, it means the reader is `caught-up` with the
- * writer.
- * <p>
- * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
- * state machines. Applications could use <i>blocking</i> reads till caught up
- * with latest data. Once they are caught up with latest data, they could start
- * serving their service and turn to <i>non-blocking</i> read mode and tail read
- * data from the logs.
- * <p>
- * See examples below.
- *
- * <h4>Read Single Record</h4>
- *
- * {@link #readNext(boolean)} is reading individual records from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- *
- * // keep reading records in blocking way until no records available in the log
- * LogRecord record = reader.readNext(false);
- * while (null != record) {
- * // process the record
- * ...
- * // read next record
- * records = reader.readNext(false);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- * record = reader.readNext(true)
- * // process the new records
- * ...
- * }
- * </pre>
- *
- * <h4>Read Batch of Records</h4>
- *
- * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
- * from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- * int N = 10;
- *
- * // keep reading N records in blocking way until no records available in the log
- * List<LogRecord> records = reader.readBulk(false, N);
- * while (!records.isEmpty()) {
- * // process the list of records
- * ...
- * if (records.size() < N) { // no more records available in the log
- * break;
- * }
- * // read next N records
- * records = reader.readBulk(false, N);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- * records = reader.readBulk(true, N)
- * // process the new records
- * ...
- * }
- *
- * </pre>
- *
- * @see AsyncLogReader
- *
- * NOTE:
- * 1. Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
- * the {@link AsyncCloseable} interface so the reader could be closed asynchronously
- */
-public interface LogReader extends Closeable, AsyncCloseable {
-
- /**
- * Read the next log record from the stream.
- * <p>
- * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
- * records from read ahead cache. It would return <i>null</i> if there isn't any records
- * available in the read ahead cache.
- * <p>
- * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
- * block until return a record if there are records in the stream (aka catching up).
- * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
- * milliseconds and return null if there isn't any more records in the stream.
- *
- * @param nonBlocking should the read make blocking calls to the backend or rely on the
- * readAhead cache
- * @return an operation from the stream or null if at end of stream
- * @throws IOException if there is an error reading from the stream
- */
- public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
-
- /**
- * Read the next <i>numLogRecords</i> log records from the stream
- *
- * @param nonBlocking should the read make blocking calls to the backend or rely on the
- * readAhead cache
- * @param numLogRecords maximum number of log records returned by this call.
- * @return an operation from the stream or empty list if at end of stream
- * @throws IOException if there is an error reading from the stream
- * @see #readNext(boolean)
- */
- public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
-}