You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:10 UTC
[05/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java
new file mode 100644
index 0000000..f951991
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConstants.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+public class DistributedLogConstants {
+ public static final byte[] EMPTY_BYTES = new byte[0];
+ public static final String SCHEME_PREFIX = "distributedlog";
+ public static final String BACKEND_BK = "bk";
+ public static final long INVALID_TXID = -999;
+ public static final long EMPTY_LOGSEGMENT_TX_ID = -99;
+ public static final long MAX_TXID = Long.MAX_VALUE;
+ public static final long SMALL_LOGSEGMENT_THRESHOLD = 10;
+ public static final int LOGSEGMENT_NAME_VERSION = 1;
+ public static final int FUTURE_TIMEOUT_IMMEDIATE = 0;
+ public static final int FUTURE_TIMEOUT_INFINITE = -1;
+ public static final long LOCK_IMMEDIATE = FUTURE_TIMEOUT_IMMEDIATE;
+ public static final long LOCK_TIMEOUT_INFINITE = FUTURE_TIMEOUT_INFINITE;
+ public static final long LOCK_OP_TIMEOUT_DEFAULT = 120;
+ public static final long LOCK_REACQUIRE_TIMEOUT_DEFAULT = 120;
+ public static final String UNKNOWN_CLIENT_ID = "Unknown-ClientId";
+ public static final int LOCAL_REGION_ID = 0;
+ public static final long LOGSEGMENT_DEFAULT_STATUS = 0;
+ public static final long UNASSIGNED_LOGSEGMENT_SEQNO = 0;
+ public static final long UNASSIGNED_SEQUENCE_ID = -1L;
+ public static final long FIRST_LOGSEGMENT_SEQNO = 1;
+ public static final long UNRESOLVED_LEDGER_ID = -1;
+ public static final long LATENCY_WARN_THRESHOLD_IN_MILLIS = TimeUnit.SECONDS.toMillis(1);
+ public static final int DL_INTERRUPTED_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 1;
+ public static final int ZK_CONNECTION_EXCEPTION_RESULT_CODE = Integer.MIN_VALUE + 2;
+
+ public static final String ALLOCATION_POOL_NODE = ".allocation_pool";
+ // log segment prefix
+ public static final String INPROGRESS_LOGSEGMENT_PREFIX = "inprogress";
+ public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
+ public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
+ static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+ static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
+
+ // An ACL that gives all permissions to node creators and read permissions only to everyone else.
+ public static final List<ACL> EVERYONE_READ_CREATOR_ALL =
+ ImmutableList.<ACL>builder()
+ .addAll(Ids.CREATOR_ALL_ACL)
+ .addAll(Ids.READ_ACL_UNSAFE)
+ .build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
new file mode 100644
index 0000000..7d33e9c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.io.AsyncCloseable;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.subscription.SubscriptionStateStore;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+import com.twitter.util.Future;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A DistributedLogManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+public interface DistributedLogManager extends AsyncCloseable, Closeable {
+
+ /**
+ * Get the name of the stream managed by this log manager
+ * @return streamName
+ */
+ public String getStreamName();
+
+ /**
+ * Get the namespace driver used by this manager.
+ *
+ * @return the namespace driver
+ */
+ public NamespaceDriver getNamespaceDriver();
+
+ /**
+ * Get log segments.
+ *
+ * @return log segments
+ * @throws IOException
+ */
+ public List<LogSegmentMetadata> getLogSegments() throws IOException;
+
+ /**
+ * Register <i>listener</i> on log segment updates of this stream.
+ *
+ * @param listener
+ * listener to receive update log segment list.
+ */
+ public void registerListener(LogSegmentListener listener) throws IOException ;
+
+ /**
+ * Unregister <i>listener</i> on log segment updates from this stream.
+ *
+ * @param listener
+ * listener to receive update log segment list.
+ */
+ public void unregisterListener(LogSegmentListener listener);
+
+ /**
+ * Open async log writer to write records to the log stream.
+ *
+ * @return result represents the open result
+ */
+ public Future<AsyncLogWriter> openAsyncLogWriter();
+
+ /**
+ * Begin writing to the log stream identified by the name
+ *
+ * @return the writer interface to generate log records
+ */
+ public LogWriter startLogSegmentNonPartitioned() throws IOException;
+
+ /**
+ * Begin writing to the log stream identified by the name
+ *
+ * @return the writer interface to generate log records
+ */
+ // @Deprecated
+ public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
+
+ /**
+ * Begin appending to the end of the log stream which is being treated as a sequence of bytes
+ *
+ * @return the writer interface to generate log records
+ */
+ public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
+
+ /**
+ * Get a reader to read a log stream as a sequence of bytes
+ *
+ * @return the writer interface to generate log records
+ */
+ public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
+
+ /**
+ * Get the input stream starting with fromTxnId for the specified log
+ *
+ * @param fromTxnId - the first transaction id we want to read
+ * @return the stream starting with transaction fromTxnId
+ * @throws IOException if a stream cannot be found.
+ */
+ public LogReader getInputStream(long fromTxnId)
+ throws IOException;
+
+ public LogReader getInputStream(DLSN fromDLSN) throws IOException;
+
+ /**
+ * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
+ *
+ * @param fromTxnId
+ * transaction id to start reading from
+ * @return async log reader
+ */
+ public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
+
+ /**
+ * Open an async log reader to read records from a log starting from <code>fromDLSN</code>
+ *
+ * @param fromDLSN
+ * dlsn to start reading from
+ * @return async log reader
+ */
+ public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
+
+ // @Deprecated
+ public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
+
+ // @Deprecated
+ public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
+
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
+
+ /**
+ * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
+ * If two readers tried to open using same subscriberId, one would succeed, while the other
+ * will be blocked until it gets the lock.
+ *
+ * @param fromDLSN
+ * start dlsn
+ * @param subscriberId
+ * subscriber id
+ * @return async log reader
+ */
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
+
+ /**
+ * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
+ * its last commit position recorded in subscription store. If no last commit position found
+ * in subscription store, it would start reading from head of the stream.
+ *
+ * If the two readers tried to open using same subscriberId, one would succeed, while the other
+ * will be blocked until it gets the lock.
+ *
+ * @param subscriberId
+ * subscriber id
+ * @return async log reader
+ */
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
+
+ /**
+ * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
+ *
+ * @param transactionId
+ * transaction id
+ * @return dlsn of first log record whose transaction id is not less than transactionId.
+ */
+ public Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
+
+ /**
+ * Get the last log record in the stream
+ *
+ * @return the last log record in the stream
+ * @throws IOException if a stream cannot be found.
+ */
+ public LogRecordWithDLSN getLastLogRecord()
+ throws IOException;
+
+ /**
+ * Get the earliest Transaction Id available in the log
+ *
+ * @return earliest transaction id
+ * @throws IOException
+ */
+ public long getFirstTxId() throws IOException;
+
+ /**
+ * Get Latest Transaction Id in the log
+ *
+ * @return latest transaction id
+ * @throws IOException
+ */
+ public long getLastTxId() throws IOException;
+
+ /**
+ * Get Latest DLSN in the log
+ *
+ * @return last dlsn
+ * @throws IOException
+ */
+ public DLSN getLastDLSN() throws IOException;
+
+ /**
+ * Get Latest log record with DLSN in the log - async
+ *
+ * @return latest log record with DLSN
+ */
+ public Future<LogRecordWithDLSN> getLastLogRecordAsync();
+
+ /**
+ * Get Latest Transaction Id in the log - async
+ *
+ * @return latest transaction id
+ */
+ public Future<Long> getLastTxIdAsync();
+
+ /**
+ * Get first DLSN in the log.
+ *
+ * @return first dlsn in the stream
+ */
+ public Future<DLSN> getFirstDLSNAsync();
+
+ /**
+ * Get Latest DLSN in the log - async
+ *
+ * @return latest transaction id
+ */
+ public Future<DLSN> getLastDLSNAsync();
+
+ /**
+ * Get the number of log records in the active portion of the log
+ * Any log segments that have already been truncated will not be included
+ *
+ * @return number of log records
+ * @throws IOException
+ */
+ public long getLogRecordCount() throws IOException;
+
+ /**
+ * Get the number of log records in the active portion of the log - async.
+ * Any log segments that have already been truncated will not be included
+ *
+ * @return future number of log records
+ * @throws IOException
+ */
+ public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
+
+ /**
+ * Run recovery on the log.
+ *
+ * @throws IOException
+ */
+ public void recover() throws IOException;
+
+ /**
+ * Check if an end of stream marker was added to the stream
+ * A stream with an end of stream marker cannot be appended to
+ *
+ * @return true if the marker was added to the stream, false otherwise
+ * @throws IOException
+ */
+ public boolean isEndOfStreamMarked() throws IOException;
+
+ /**
+ * Delete the log.
+ *
+ * @throws IOException if the deletion fails
+ */
+ public void delete() throws IOException;
+
+ /**
+ * The DistributedLogManager may archive/purge any logs for transactionId
+ * less than or equal to minImageTxId.
+ * This is to be used only when the client explicitly manages deletion. If
+ * the cleanup policy is based on sliding time window, then this method need
+ * not be called.
+ *
+ * @param minTxIdToKeep the earliest txid that must be retained
+ * @throws IOException if purging fails
+ */
+ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
+
+ /**
+ * Get the subscriptions store provided by the distributedlog manager.
+ *
+ * @return subscriptions store manages subscriptions for current stream.
+ */
+ public SubscriptionsStore getSubscriptionsStore();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
new file mode 100644
index 0000000..617282c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.CompressionCodec;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A set of {@link LogRecord}s.
+ */
+public class Entry {
+
+ /**
+ * Create a new log record set.
+ *
+ * @param logName
+ * name of the log
+ * @param initialBufferSize
+ * initial buffer size
+ * @param envelopeBeforeTransmit
+ * if envelope the buffer before transmit
+ * @param codec
+ * compression codec
+ * @param statsLogger
+ * stats logger to receive stats
+ * @return writer to build a log record set.
+ */
+ public static Writer newEntry(
+ String logName,
+ int initialBufferSize,
+ boolean envelopeBeforeTransmit,
+ CompressionCodec.Type codec,
+ StatsLogger statsLogger) {
+ return new EnvelopedEntryWriter(
+ logName,
+ initialBufferSize,
+ envelopeBeforeTransmit,
+ codec,
+ statsLogger);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Build the record set object.
+ */
+ public static class Builder {
+
+ private long logSegmentSequenceNumber = -1;
+ private long entryId = -1;
+ private long startSequenceId = Long.MIN_VALUE;
+ private boolean envelopeEntry = true;
+ // input stream
+ private InputStream in = null;
+ // or bytes array
+ private byte[] data = null;
+ private int offset = -1;
+ private int length = -1;
+ private Optional<Long> txidToSkipTo = Optional.absent();
+ private Optional<DLSN> dlsnToSkipTo = Optional.absent();
+ private boolean deserializeRecordSet = true;
+
+ private Builder() {}
+
+ /**
+ * Reset the builder.
+ *
+ * @return builder
+ */
+ public Builder reset() {
+ logSegmentSequenceNumber = -1;
+ entryId = -1;
+ startSequenceId = Long.MIN_VALUE;
+ envelopeEntry = true;
+ // input stream
+ in = null;
+ // or bytes array
+ data = null;
+ offset = -1;
+ length = -1;
+ txidToSkipTo = Optional.absent();
+ dlsnToSkipTo = Optional.absent();
+ return this;
+ }
+
+ /**
+ * Set the segment info of the log segment that this record
+ * set belongs to.
+ *
+ * @param lssn
+ * log segment sequence number
+ * @param startSequenceId
+ * start sequence id of this log segment
+ * @return builder
+ */
+ public Builder setLogSegmentInfo(long lssn, long startSequenceId) {
+ this.logSegmentSequenceNumber = lssn;
+ this.startSequenceId = startSequenceId;
+ return this;
+ }
+
+ /**
+ * Set the entry id of this log record set.
+ *
+ * @param entryId
+ * entry id assigned for this log record set.
+ * @return builder
+ */
+ public Builder setEntryId(long entryId) {
+ this.entryId = entryId;
+ return this;
+ }
+
+ /**
+ * Set whether this record set is enveloped or not.
+ *
+ * @param enabled
+ * flag indicates whether this record set is enveloped or not.
+ * @return builder
+ */
+ public Builder setEnvelopeEntry(boolean enabled) {
+ this.envelopeEntry = enabled;
+ return this;
+ }
+
+ /**
+ * Set the serialized bytes data of this record set.
+ *
+ * @param data
+ * serialized bytes data of this record set.
+ * @param offset
+ * offset of the bytes data
+ * @param length
+ * length of the bytes data
+ * @return builder
+ */
+ public Builder setData(byte[] data, int offset, int length) {
+ this.data = data;
+ this.offset = offset;
+ this.length = length;
+ return this;
+ }
+
+ /**
+ * Set the input stream of the serialized bytes data of this record set.
+ *
+ * @param in
+ * input stream
+ * @return builder
+ */
+ public Builder setInputStream(InputStream in) {
+ this.in = in;
+ return this;
+ }
+
+ /**
+ * Set the record set starts from <code>dlsn</code>.
+ *
+ * @param dlsn
+ * dlsn to skip to
+ * @return builder
+ */
+ public Builder skipTo(@Nullable DLSN dlsn) {
+ this.dlsnToSkipTo = Optional.fromNullable(dlsn);
+ return this;
+ }
+
+ /**
+ * Set the record set starts from <code>txid</code>.
+ *
+ * @param txid
+ * txid to skip to
+ * @return builder
+ */
+ public Builder skipTo(long txid) {
+ this.txidToSkipTo = Optional.of(txid);
+ return this;
+ }
+
+ /**
+ * Enable/disable deserialize record set.
+ *
+ * @param enabled
+ * flag to enable/disable dserialize record set.
+ * @return builder
+ */
+ public Builder deserializeRecordSet(boolean enabled) {
+ this.deserializeRecordSet = enabled;
+ return this;
+ }
+
+ public Entry build() {
+ Preconditions.checkNotNull(data, "Serialized data isn't provided");
+ Preconditions.checkArgument(offset >= 0 && length >= 0
+ && (offset + length) <= data.length,
+ "Invalid offset or length of serialized data");
+ return new Entry(
+ logSegmentSequenceNumber,
+ entryId,
+ startSequenceId,
+ envelopeEntry,
+ deserializeRecordSet,
+ data,
+ offset,
+ length,
+ txidToSkipTo,
+ dlsnToSkipTo);
+ }
+
+ public Entry.Reader buildReader() throws IOException {
+ Preconditions.checkArgument(data != null || in != null,
+ "Serialized data or input stream isn't provided");
+ InputStream in;
+ if (null != this.in) {
+ in = this.in;
+ } else {
+ Preconditions.checkArgument(offset >= 0 && length >= 0
+ && (offset + length) <= data.length,
+ "Invalid offset or length of serialized data");
+ in = new ByteArrayInputStream(data, offset, length);
+ }
+ return new EnvelopedEntryReader(
+ logSegmentSequenceNumber,
+ entryId,
+ startSequenceId,
+ in,
+ envelopeEntry,
+ deserializeRecordSet,
+ NullStatsLogger.INSTANCE);
+ }
+
+ }
+
+ private final long logSegmentSequenceNumber;
+ private final long entryId;
+ private final long startSequenceId;
+ private final boolean envelopedEntry;
+ private final boolean deserializeRecordSet;
+ private final byte[] data;
+ private final int offset;
+ private final int length;
+ private final Optional<Long> txidToSkipTo;
+ private final Optional<DLSN> dlsnToSkipTo;
+
+ private Entry(long logSegmentSequenceNumber,
+ long entryId,
+ long startSequenceId,
+ boolean envelopedEntry,
+ boolean deserializeRecordSet,
+ byte[] data,
+ int offset,
+ int length,
+ Optional<Long> txidToSkipTo,
+ Optional<DLSN> dlsnToSkipTo) {
+ this.logSegmentSequenceNumber = logSegmentSequenceNumber;
+ this.entryId = entryId;
+ this.startSequenceId = startSequenceId;
+ this.envelopedEntry = envelopedEntry;
+ this.deserializeRecordSet = deserializeRecordSet;
+ this.data = data;
+ this.offset = offset;
+ this.length = length;
+ this.txidToSkipTo = txidToSkipTo;
+ this.dlsnToSkipTo = dlsnToSkipTo;
+ }
+
+ /**
+ * Get raw data of this record set.
+ *
+ * @return raw data representation of this record set.
+ */
+ public byte[] getRawData() {
+ return data;
+ }
+
+ /**
+ * Create reader to iterate over this record set.
+ *
+ * @return reader to iterate over this record set.
+ * @throws IOException if the record set is invalid record set.
+ */
+ public Reader reader() throws IOException {
+ InputStream in = new ByteArrayInputStream(data, offset, length);
+ Reader reader = new EnvelopedEntryReader(
+ logSegmentSequenceNumber,
+ entryId,
+ startSequenceId,
+ in,
+ envelopedEntry,
+ deserializeRecordSet,
+ NullStatsLogger.INSTANCE);
+ if (txidToSkipTo.isPresent()) {
+ reader.skipTo(txidToSkipTo.get());
+ }
+ if (dlsnToSkipTo.isPresent()) {
+ reader.skipTo(dlsnToSkipTo.get());
+ }
+ return reader;
+ }
+
+ /**
+ * Writer to append {@link LogRecord}s to {@link Entry}.
+ */
+ public interface Writer extends EntryBuffer {
+
+ /**
+ * Write a {@link LogRecord} to this record set.
+ *
+ * @param record
+ * record to write
+ * @param transmitPromise
+ * callback for transmit result. the promise is only
+ * satisfied when this record set is transmitted.
+ * @throws LogRecordTooLongException if the record is too long
+ * @throws WriteException when encountered exception writing the record
+ */
+ void writeRecord(LogRecord record, Promise<DLSN> transmitPromise)
+ throws LogRecordTooLongException, WriteException;
+
+ /**
+ * Reset the writer to write records.
+ */
+ void reset();
+
+ }
+
+ /**
+ * Reader to read {@link LogRecord}s from this record set.
+ */
+ public interface Reader {
+
+ /**
+ * Get the log segment sequence number.
+ *
+ * @return the log segment sequence number.
+ */
+ long getLSSN();
+
+ /**
+ * Return the entry id.
+ *
+ * @return the entry id.
+ */
+ long getEntryId();
+
+ /**
+ * Read next log record from this record set.
+ *
+ * @return next log record from this record set.
+ */
+ LogRecordWithDLSN nextRecord() throws IOException;
+
+ /**
+ * Skip the reader to the record whose transaction id is <code>txId</code>.
+ *
+ * @param txId
+ * transaction id to skip to.
+ * @return true if skip succeeds, otherwise false.
+ * @throws IOException
+ */
+ boolean skipTo(long txId) throws IOException;
+
+ /**
+ * Skip the reader to the record whose DLSN is <code>dlsn</code>.
+ *
+ * @param dlsn
+ * DLSN to skip to.
+ * @return true if skip succeeds, otherwise false.
+ * @throws IOException
+ */
+ boolean skipTo(DLSN dlsn) throws IOException;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
new file mode 100644
index 0000000..c695420
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryBuffer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.TransmitListener;
+
+import java.io.IOException;
+
+/**
+ * Write representation of a {@link Entry}.
+ * It is a buffer of log record set, used for transmission.
+ */
+public interface EntryBuffer extends TransmitListener {
+
+ /**
+ * Return if this record set contains user records.
+ *
+ * @return true if this record set contains user records, otherwise
+ * return false.
+ */
+ boolean hasUserRecords();
+
+ /**
+ * Return number of records in current record set.
+ *
+ * @return number of records in current record set.
+ */
+ int getNumRecords();
+
+ /**
+ * Return number of bytes in current record set.
+ *
+ * @return number of bytes in current record set.
+ */
+ int getNumBytes();
+
+ /**
+ * Return max tx id in current record set.
+ *
+ * @return max tx id.
+ */
+ long getMaxTxId();
+
+ /**
+ * Get the buffer to transmit.
+ *
+ * @return the buffer to transmit.
+ * @throws InvalidEnvelopedEntryException if the record set buffer is invalid
+ * @throws IOException when encountered IOException during serialization
+ */
+ Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java
new file mode 100644
index 0000000..218662c
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EntryPosition.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+/**
+ * The position of an entry, identified by log segment sequence number and entry id.
+ */
+class EntryPosition {
+
+ private long lssn;
+ private long entryId;
+
+ EntryPosition(long lssn, long entryId) {
+ this.lssn = lssn;
+ this.entryId = entryId;
+ }
+
+ public synchronized long getLogSegmentSequenceNumber() {
+ return lssn;
+ }
+
+ public synchronized long getEntryId() {
+ return entryId;
+ }
+
+ public synchronized boolean advance(long lssn, long entryId) {
+ if (lssn == this.lssn) {
+ if (entryId <= this.entryId) {
+ return false;
+ }
+ this.entryId = entryId;
+ return true;
+ } else if (lssn > this.lssn) {
+ this.lssn = lssn;
+ this.entryId = entryId;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(").append(lssn).append(", ").append(entryId).append(")");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
new file mode 100644
index 0000000..eb1e9af
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import org.apache.distributedlog.annotations.DistributedLogAnnotations.Compression;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionUtils;
+import org.apache.distributedlog.util.BitMaskUtils;
+
+/**
+ * An enveloped entry written to BookKeeper.
+ *
+ * Data type in brackets. Interpretation should be on the basis of data types and not individual
+ * bytes to honor Endianness.
+ *
+ * Entry Structure:
+ * ---------------
+ * Bytes 0 : Version (Byte)
+ * Bytes 1 - (DATA = 1+Header.length-1) : Header (Integer)
+ * Bytes DATA - DATA+3 : Payload Length (Integer)
+ * BYTES DATA+4 - DATA+4+payload.length-1 : Payload (Byte[])
+ *
+ * V1 Header Structure: // Offsets relative to the start of the header.
+ * -------------------
+ * Bytes 0 - 3 : Flags (Integer)
+ * Bytes 4 - 7 : Original payload size before compression (Integer)
+ *
+ * Flags: // 32 Bits
+ * -----
+ * 0 ... 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+ * |_|
+ * |
+ * Compression Type
+ *
+ * Compression Type: // 2 Bits (Least significant)
+ * ----------------
+ * 00 : No Compression
+ * 01 : LZ4 Compression
+ * 10 : Unused
+ * 11 : Unused
+ */
+public class EnvelopedEntry {
+
+ public static final int VERSION_LENGTH = 1; // One byte long
+ public static final byte VERSION_ONE = 1;
+
+ public static final byte LOWEST_SUPPORTED_VERSION = VERSION_ONE;
+ public static final byte HIGHEST_SUPPORTED_VERSION = VERSION_ONE;
+ public static final byte CURRENT_VERSION = VERSION_ONE;
+
+ private final OpStatsLogger compressionStat;
+ private final OpStatsLogger decompressionStat;
+ private final Counter compressedEntryBytes;
+ private final Counter decompressedEntryBytes;
+ private final byte version;
+
+ private Header header = new Header();
+ private Payload payloadCompressed = new Payload();
+ private Payload payloadDecompressed = new Payload();
+
+ public EnvelopedEntry(byte version,
+ StatsLogger statsLogger) throws InvalidEnvelopedEntryException {
+ Preconditions.checkNotNull(statsLogger);
+ if (version < LOWEST_SUPPORTED_VERSION || version > HIGHEST_SUPPORTED_VERSION) {
+ throw new InvalidEnvelopedEntryException("Invalid enveloped entry version " + version + ", expected to be in [ "
+ + LOWEST_SUPPORTED_VERSION + " ~ " + HIGHEST_SUPPORTED_VERSION + " ]");
+ }
+ this.version = version;
+ this.compressionStat = statsLogger.getOpStatsLogger("compression_time");
+ this.decompressionStat = statsLogger.getOpStatsLogger("decompression_time");
+ this.compressedEntryBytes = statsLogger.getCounter("compressed_bytes");
+ this.decompressedEntryBytes = statsLogger.getCounter("decompressed_bytes");
+ }
+
+ /**
+ * @param statsLogger
+ * Used for getting stats for (de)compression time
+ * @param compressionType
+ * The compression type to use
+ * @param decompressed
+ * The decompressed payload
+ * NOTE: The size of the byte array passed as the decompressed payload can be larger
+ * than the actual contents to be compressed.
+ */
+ public EnvelopedEntry(byte version,
+ CompressionCodec.Type compressionType,
+ byte[] decompressed,
+ int length,
+ StatsLogger statsLogger)
+ throws InvalidEnvelopedEntryException {
+ this(version, statsLogger);
+ Preconditions.checkNotNull(compressionType);
+ Preconditions.checkNotNull(decompressed);
+ Preconditions.checkArgument(length >= 0, "Invalid bytes length " + length);
+
+ this.header = new Header(compressionType, length);
+ this.payloadDecompressed = new Payload(length, decompressed);
+ }
+
+ private boolean isReady() {
+ return (header.ready && payloadDecompressed.ready);
+ }
+
+ @Compression
+ public void writeFully(DataOutputStream out) throws IOException {
+ Preconditions.checkNotNull(out);
+ if (!isReady()) {
+ throw new IOException("Entry not writable");
+ }
+ // Version
+ out.writeByte(version);
+ // Header
+ header.write(out);
+ // Compress
+ CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
+ byte[] compressed = codec.compress(
+ payloadDecompressed.payload,
+ 0,
+ payloadDecompressed.length,
+ compressionStat);
+ this.payloadCompressed = new Payload(compressed.length, compressed);
+ this.compressedEntryBytes.add(payloadCompressed.length);
+ this.decompressedEntryBytes.add(payloadDecompressed.length);
+ payloadCompressed.write(out);
+ }
+
+ @Compression
+ public void readFully(DataInputStream in) throws IOException {
+ Preconditions.checkNotNull(in);
+ // Make sure we're reading the right versioned entry.
+ byte version = in.readByte();
+ if (version != this.version) {
+ throw new IOException(String.format("Version mismatch while reading. Received: %d," +
+ " Required: %d", version, this.version));
+ }
+ header.read(in);
+ payloadCompressed.read(in);
+ // Decompress
+ CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
+ byte[] decompressed = codec.decompress(
+ payloadCompressed.payload,
+ 0,
+ payloadCompressed.length,
+ header.decompressedSize,
+ decompressionStat);
+ this.payloadDecompressed = new Payload(decompressed.length, decompressed);
+ this.compressedEntryBytes.add(payloadCompressed.length);
+ this.decompressedEntryBytes.add(payloadDecompressed.length);
+ }
+
+ public byte[] getDecompressedPayload() throws IOException {
+ if (!isReady()) {
+ throw new IOException("Decompressed payload is not initialized");
+ }
+ return payloadDecompressed.payload;
+ }
+
+ public static class Header {
+ public static final int COMPRESSION_CODEC_MASK = 0x3;
+ public static final int COMPRESSION_CODEC_NONE = 0x0;
+ public static final int COMPRESSION_CODEC_LZ4 = 0x1;
+
+ private int flags = 0;
+ private int decompressedSize = 0;
+ private CompressionCodec.Type compressionType = CompressionCodec.Type.UNKNOWN;
+
+ // Whether this struct is ready for reading/writing.
+ private boolean ready = false;
+
+ // Used while reading.
+ public Header() {
+ }
+
+ public Header(CompressionCodec.Type compressionType,
+ int decompressedSize) {
+ this.compressionType = compressionType;
+ this.decompressedSize = decompressedSize;
+ this.flags = 0;
+ switch (compressionType) {
+ case NONE:
+ this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
+ COMPRESSION_CODEC_NONE);
+ break;
+ case LZ4:
+ this.flags = (int) BitMaskUtils.set(flags, COMPRESSION_CODEC_MASK,
+ COMPRESSION_CODEC_LZ4);
+ break;
+ default:
+ throw new RuntimeException(String.format("Unknown Compression Type: %s",
+ compressionType));
+ }
+ // This can now be written.
+ this.ready = true;
+ }
+
+ private void write(DataOutputStream out) throws IOException {
+ out.writeInt(flags);
+ out.writeInt(decompressedSize);
+ }
+
+ private void read(DataInputStream in) throws IOException {
+ this.flags = in.readInt();
+ int compressionType = (int) BitMaskUtils.get(flags, COMPRESSION_CODEC_MASK);
+ if (compressionType == COMPRESSION_CODEC_NONE) {
+ this.compressionType = CompressionCodec.Type.NONE;
+ } else if (compressionType == COMPRESSION_CODEC_LZ4) {
+ this.compressionType = CompressionCodec.Type.LZ4;
+ } else {
+ throw new IOException(String.format("Unsupported Compression Type: %s",
+ compressionType));
+ }
+ this.decompressedSize = in.readInt();
+ // Values can now be read.
+ this.ready = true;
+ }
+ }
+
+ public static class Payload {
+ private int length = 0;
+ private byte[] payload = null;
+
+ // Whether this struct is ready for reading/writing.
+ private boolean ready = false;
+
+ // Used for reading
+ Payload() {
+ }
+
+ Payload(int length, byte[] payload) {
+ this.length = length;
+ this.payload = payload;
+ this.ready = true;
+ }
+
+ private void write(DataOutputStream out) throws IOException {
+ out.writeInt(length);
+ out.write(payload, 0, length);
+ }
+
+ private void read(DataInputStream in) throws IOException {
+ this.length = in.readInt();
+ this.payload = new byte[length];
+ in.readFully(payload);
+ this.ready = true;
+ }
+ }
+
+ /**
+ * Return an InputStream that reads from the provided InputStream, decompresses the data
+ * and returns a new InputStream wrapping the underlying payload.
+ *
+ * Note that src is modified by this call.
+ *
+ * @return
+ * New Input stream with the underlying payload.
+ * @throws Exception
+ */
+ public static InputStream fromInputStream(InputStream src,
+ StatsLogger statsLogger) throws IOException {
+ src.mark(VERSION_LENGTH);
+ byte version = new DataInputStream(src).readByte();
+ src.reset();
+ EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger);
+ entry.readFully(new DataInputStream(src));
+ return new ByteArrayInputStream(entry.getDecompressedPayload());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
new file mode 100644
index 0000000..1761de5
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.bookkeeper.stats.StatsLogger;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Record reader to read records from an enveloped entry buffer.
+ */
+class EnvelopedEntryReader implements Entry.Reader, RecordStream {
+
+ private final long logSegmentSeqNo;
+ private final long entryId;
+ private final LogRecord.Reader reader;
+
+ // slot id
+ private long slotId = 0;
+
+ EnvelopedEntryReader(long logSegmentSeqNo,
+ long entryId,
+ long startSequenceId,
+ InputStream in,
+ boolean envelopedEntry,
+ boolean deserializeRecordSet,
+ StatsLogger statsLogger)
+ throws IOException {
+ this.logSegmentSeqNo = logSegmentSeqNo;
+ this.entryId = entryId;
+ InputStream src = in;
+ if (envelopedEntry) {
+ src = EnvelopedEntry.fromInputStream(in, statsLogger);
+ }
+ this.reader = new LogRecord.Reader(
+ this,
+ new DataInputStream(src),
+ startSequenceId,
+ deserializeRecordSet);
+ }
+
+ @Override
+ public long getLSSN() {
+ return logSegmentSeqNo;
+ }
+
+ @Override
+ public long getEntryId() {
+ return entryId;
+ }
+
+ @Override
+ public LogRecordWithDLSN nextRecord() throws IOException {
+ return reader.readOp();
+ }
+
+ @Override
+ public boolean skipTo(long txId) throws IOException {
+ return reader.skipTo(txId, true);
+ }
+
+ @Override
+ public boolean skipTo(DLSN dlsn) throws IOException {
+ return reader.skipTo(dlsn);
+ }
+
+ //
+ // Record Stream
+ //
+
+ @Override
+ public void advance(int numRecords) {
+ slotId += numRecords;
+ }
+
+ @Override
+ public DLSN getCurrentPosition() {
+ return new DLSN(logSegmentSeqNo, entryId, slotId);
+ }
+
+ @Override
+ public String getName() {
+ return "EnvelopedReader";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
new file mode 100644
index 0000000..54858d7
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.Entry.Writer;
+import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteCancelledException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.CompressionCodec;
+import com.twitter.util.Promise;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+
+/**
+ * {@link org.apache.distributedlog.io.Buffer} based log record set writer.
+ */
+class EnvelopedEntryWriter implements Writer {
+
+ static final Logger logger = LoggerFactory.getLogger(EnvelopedEntryWriter.class);
+
+ private static class WriteRequest {
+
+ private final int numRecords;
+ private final Promise<DLSN> promise;
+
+ WriteRequest(int numRecords, Promise<DLSN> promise) {
+ this.numRecords = numRecords;
+ this.promise = promise;
+ }
+
+ }
+
+ private final String logName;
+ private final Buffer buffer;
+ private final LogRecord.Writer writer;
+ private final List<WriteRequest> writeRequests;
+ private final boolean envelopeBeforeTransmit;
+ private final CompressionCodec.Type codec;
+ private final StatsLogger statsLogger;
+ private int count = 0;
+ private boolean hasUserData = false;
+ private long maxTxId = Long.MIN_VALUE;
+
+ EnvelopedEntryWriter(String logName,
+ int initialBufferSize,
+ boolean envelopeBeforeTransmit,
+ CompressionCodec.Type codec,
+ StatsLogger statsLogger) {
+ this.logName = logName;
+ this.buffer = new Buffer(initialBufferSize * 6 / 5);
+ this.writer = new LogRecord.Writer(new DataOutputStream(buffer));
+ this.writeRequests = new LinkedList<WriteRequest>();
+ this.envelopeBeforeTransmit = envelopeBeforeTransmit;
+ this.codec = codec;
+ this.statsLogger = statsLogger;
+ }
+
+ @Override
+ public synchronized void reset() {
+ cancelPromises(new WriteCancelledException(logName, "Record Set is reset"));
+ count = 0;
+ this.buffer.reset();
+ }
+
+ @Override
+ public synchronized void writeRecord(LogRecord record,
+ Promise<DLSN> transmitPromise)
+ throws LogRecordTooLongException, WriteException {
+ int logRecordSize = record.getPersistentSize();
+ if (logRecordSize > MAX_LOGRECORD_SIZE) {
+ throw new LogRecordTooLongException(
+ "Log Record of size " + logRecordSize + " written when only "
+ + MAX_LOGRECORD_SIZE + " is allowed");
+ }
+
+ try {
+ this.writer.writeOp(record);
+ int numRecords = 1;
+ if (!record.isControl()) {
+ hasUserData = true;
+ }
+ if (record.isRecordSet()) {
+ numRecords = LogRecordSet.numRecords(record);
+ }
+ count += numRecords;
+ writeRequests.add(new WriteRequest(numRecords, transmitPromise));
+ maxTxId = Math.max(maxTxId, record.getTransactionId());
+ } catch (IOException e) {
+ logger.error("Failed to append record to record set of {} : ",
+ logName, e);
+ throw new WriteException(logName, "Failed to append record to record set of "
+ + logName);
+ }
+ }
+
+ private synchronized void satisfyPromises(long lssn, long entryId) {
+ long nextSlotId = 0;
+ for (WriteRequest request : writeRequests) {
+ request.promise.setValue(new DLSN(lssn, entryId, nextSlotId));
+ nextSlotId += request.numRecords;
+ }
+ writeRequests.clear();
+ }
+
+ private synchronized void cancelPromises(Throwable reason) {
+ for (WriteRequest request : writeRequests) {
+ request.promise.setException(reason);
+ }
+ writeRequests.clear();
+ }
+
+ @Override
+ public synchronized long getMaxTxId() {
+ return maxTxId;
+ }
+
+ @Override
+ public synchronized boolean hasUserRecords() {
+ return hasUserData;
+ }
+
+ @Override
+ public int getNumBytes() {
+ return buffer.size();
+ }
+
+ @Override
+ public synchronized int getNumRecords() {
+ return count;
+ }
+
+ @Override
+ public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IOException {
+ if (!envelopeBeforeTransmit) {
+ return buffer;
+ }
+ // We can't escape this allocation because things need to be read from one byte array
+ // and then written to another. This is the destination.
+ Buffer toSend = new Buffer(buffer.size());
+ byte[] decompressed = buffer.getData();
+ int length = buffer.size();
+ EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
+ codec,
+ decompressed,
+ length,
+ statsLogger);
+ // This will cause an allocation of a byte[] for compression. This can be avoided
+ // but we can do that later only if needed.
+ entry.writeFully(new DataOutputStream(toSend));
+ return toSend;
+ }
+
+ @Override
+ public synchronized DLSN finalizeTransmit(long lssn, long entryId) {
+ return new DLSN(lssn, entryId, count - 1);
+ }
+
+ @Override
+ public void completeTransmit(long lssn, long entryId) {
+ satisfyPromises(lssn, entryId);
+ }
+
+ @Override
+ public void abortTransmit(Throwable reason) {
+ cancelPromises(reason);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java
new file mode 100644
index 0000000..f94495f
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LedgerReadPosition.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LedgerReadPosition {
+ static final Logger LOG = LoggerFactory.getLogger(LedgerReadPosition.class);
+
+ private static enum PartialOrderingComparisonResult {
+ NotComparable,
+ GreaterThan,
+ LessThan,
+ EqualTo
+ }
+
+ long ledgerId = DistributedLogConstants.UNRESOLVED_LEDGER_ID;
+ long logSegmentSequenceNo;
+ long entryId;
+
+ public LedgerReadPosition(long ledgerId, long logSegmentSequenceNo, long entryId) {
+ this.ledgerId = ledgerId;
+ this.logSegmentSequenceNo = logSegmentSequenceNo;
+ this.entryId = entryId;
+ }
+
+ public LedgerReadPosition(LedgerReadPosition that) {
+ this.ledgerId = that.ledgerId;
+ this.logSegmentSequenceNo = that.logSegmentSequenceNo;
+ this.entryId = that.entryId;
+ }
+
+
+ public LedgerReadPosition(final DLSN dlsn) {
+ this(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId());
+ }
+
+ public LedgerReadPosition(long logSegmentSequenceNo, long entryId) {
+ this.logSegmentSequenceNo = logSegmentSequenceNo;
+ this.entryId = entryId;
+ }
+
+ public long getLedgerId() {
+ if (DistributedLogConstants.UNRESOLVED_LEDGER_ID == ledgerId) {
+ LOG.trace("Ledger Id is not initialized");
+ throw new IllegalStateException("Ledger Id is not initialized");
+ }
+ return ledgerId;
+ }
+
+ public long getLogSegmentSequenceNumber() {
+ return logSegmentSequenceNo;
+ }
+
+ public long getEntryId() {
+ return entryId;
+ }
+
+ public void advance() {
+ entryId++;
+ }
+
+ public void positionOnNewLogSegment(long ledgerId, long logSegmentSequenceNo) {
+ this.ledgerId = ledgerId;
+ this.logSegmentSequenceNo = logSegmentSequenceNo;
+ this.entryId = 0L;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("(lid=%d, lseqNo=%d, eid=%d)", ledgerId, logSegmentSequenceNo, entryId);
+ }
+
+ public boolean definitelyLessThanOrEqualTo(LedgerReadPosition threshold) {
+ PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
+ return ((result == PartialOrderingComparisonResult.LessThan) ||
+ (result == PartialOrderingComparisonResult.EqualTo));
+ }
+
+ public boolean definitelyLessThan(LedgerReadPosition threshold) {
+ PartialOrderingComparisonResult result = comparePartiallyOrdered(threshold);
+ return result == PartialOrderingComparisonResult.LessThan;
+ }
+
+ private PartialOrderingComparisonResult comparePartiallyOrdered(LedgerReadPosition threshold) {
+ // If no threshold is passed we cannot make a definitive comparison
+ if (null == threshold) {
+ return PartialOrderingComparisonResult.NotComparable;
+ }
+
+ if (this.logSegmentSequenceNo != threshold.logSegmentSequenceNo) {
+ if (this.logSegmentSequenceNo < threshold.logSegmentSequenceNo) {
+ return PartialOrderingComparisonResult.LessThan;
+ } else {
+ return PartialOrderingComparisonResult.GreaterThan;
+ }
+ } else if (this.ledgerId != threshold.ledgerId) {
+ // When logSegmentSequenceNo is equal we cannot definitely say that this
+ // position is less than the threshold unless ledgerIds are equal
+ // since LogSegmentSequenceNumber maybe inferred from transactionIds in older
+ // versions of the metadata.
+ return PartialOrderingComparisonResult.NotComparable;
+ } else if (this.getEntryId() < threshold.getEntryId()) {
+ return PartialOrderingComparisonResult.LessThan;
+ } else if (this.getEntryId() > threshold.getEntryId()) {
+ return PartialOrderingComparisonResult.GreaterThan;
+ } else {
+ return PartialOrderingComparisonResult.EqualTo;
+ }
+ }
+
+ /**
+ * Comparator for the key portion
+ */
+ public static final ReadAheadCacheKeyComparator COMPARATOR = new ReadAheadCacheKeyComparator();
+
+ // Only compares the key portion
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof LedgerReadPosition)) {
+ return false;
+ }
+ LedgerReadPosition key = (LedgerReadPosition) other;
+ return ledgerId == key.ledgerId &&
+ entryId == key.entryId;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (ledgerId * 13 ^ entryId * 17);
+ }
+
+ /**
+ * Compare EntryKey.
+ */
+ protected static class ReadAheadCacheKeyComparator implements Comparator<LedgerReadPosition>, Serializable {
+
+ private static final long serialVersionUID = 0L;
+
+ @Override
+ public int compare(LedgerReadPosition left, LedgerReadPosition right) {
+ long ret = left.ledgerId - right.ledgerId;
+ if (ret == 0) {
+ ret = left.entryId - right.entryId;
+ }
+ return (ret < 0) ? -1 : ((ret > 0) ? 1 : 0);
+ }
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
new file mode 100644
index 0000000..5623525
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.google.common.base.Optional;
+import org.apache.distributedlog.impl.metadata.BKDLConfig;
+import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
+import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.LocalBookKeeper;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class for setting up bookkeeper ensembles
+ * and bringing individual bookies up and down
+ */
+public class LocalDLMEmulator {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalDLMEmulator.class);
+
+ public static final String DLOG_NAMESPACE = "/messaging/distributedlog";
+
+ private static final int DEFAULT_BOOKIE_INITIAL_PORT = 0; // Use ephemeral ports
+ private static final int DEFAULT_ZK_TIMEOUT_SEC = 10;
+ private static final int DEFAULT_ZK_PORT = 2181;
+ private static final String DEFAULT_ZK_HOST = "127.0.0.1";
+ private static final String DEFAULT_ZK_ENSEMBLE = DEFAULT_ZK_HOST + ":" + DEFAULT_ZK_PORT;
+ private static final int DEFAULT_NUM_BOOKIES = 3;
+ private static final ServerConfiguration DEFAULT_SERVER_CONFIGURATION = new ServerConfiguration();
+
+ private final String zkEnsemble;
+ private final URI uri;
+ private final List<File> tmpDirs = new ArrayList<File>();
+ private final int zkTimeoutSec;
+ private final Thread bkStartupThread;
+ private final String zkHost;
+ private final int zkPort;
+ private final int numBookies;
+
+ public static class Builder {
+ private int zkTimeoutSec = DEFAULT_ZK_TIMEOUT_SEC;
+ private int numBookies = DEFAULT_NUM_BOOKIES;
+ private String zkHost = DEFAULT_ZK_HOST;
+ private int zkPort = DEFAULT_ZK_PORT;
+ private int initialBookiePort = DEFAULT_BOOKIE_INITIAL_PORT;
+ private boolean shouldStartZK = true;
+ private Optional<ServerConfiguration> serverConf = Optional.absent();
+
+ public Builder numBookies(int numBookies) {
+ this.numBookies = numBookies;
+ return this;
+ }
+ public Builder zkHost(String zkHost) {
+ this.zkHost = zkHost;
+ return this;
+ }
+ public Builder zkPort(int zkPort) {
+ this.zkPort = zkPort;
+ return this;
+ }
+ public Builder zkTimeoutSec(int zkTimeoutSec) {
+ this.zkTimeoutSec = zkTimeoutSec;
+ return this;
+ }
+ public Builder initialBookiePort(int initialBookiePort) {
+ this.initialBookiePort = initialBookiePort;
+ return this;
+ }
+ public Builder shouldStartZK(boolean shouldStartZK) {
+ this.shouldStartZK = shouldStartZK;
+ return this;
+ }
+ public Builder serverConf(ServerConfiguration serverConf) {
+ this.serverConf = Optional.of(serverConf);
+ return this;
+ }
+
+ public LocalDLMEmulator build() throws Exception {
+ ServerConfiguration conf = null;
+ if (serverConf.isPresent()) {
+ conf = serverConf.get();
+ } else {
+ conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone();
+ conf.setZkTimeout(zkTimeoutSec * 1000);
+ }
+ ServerConfiguration newConf = new ServerConfiguration();
+ newConf.loadConf(conf);
+ newConf.setAllowLoopback(true);
+
+ return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort,
+ initialBookiePort, zkTimeoutSec, newConf);
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception {
+ this.numBookies = numBookies;
+ this.zkHost = zkHost;
+ this.zkPort = zkPort;
+ this.zkEnsemble = zkHost + ":" + zkPort;
+ this.uri = URI.create("distributedlog://" + zkEnsemble + DLOG_NAMESPACE);
+ this.zkTimeoutSec = zkTimeoutSec;
+ this.bkStartupThread = new Thread() {
+ public void run() {
+ try {
+ LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback());
+ LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf);
+ LOG.info("{} bookies are started.");
+ } catch (InterruptedException e) {
+ // go away quietly
+ } catch (Exception e) {
+ LOG.error("Error starting local bk", e);
+ }
+ }
+ };
+ }
+
+ public void start() throws Exception {
+ bkStartupThread.start();
+ if (!LocalBookKeeper.waitForServerUp(zkEnsemble, zkTimeoutSec*1000)) {
+ throw new Exception("Error starting zookeeper/bookkeeper");
+ }
+ int bookiesUp = checkBookiesUp(numBookies, zkTimeoutSec);
+ assert (numBookies == bookiesUp);
+ // Provision "/messaging/distributedlog" namespace
+ DLMetadata.create(new BKDLConfig(zkEnsemble, "/ledgers")).create(uri);
+ }
+
+ public void teardown() throws Exception {
+ if (bkStartupThread != null) {
+ bkStartupThread.interrupt();
+ bkStartupThread.join();
+ }
+ for (File dir : tmpDirs) {
+ FileUtils.deleteDirectory(dir);
+ }
+ }
+
+ public String getZkServers() {
+ return zkEnsemble;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public BookieServer newBookie() throws Exception {
+ ServerConfiguration bookieConf = new ServerConfiguration();
+ bookieConf.setZkTimeout(zkTimeoutSec * 1000);
+ bookieConf.setBookiePort(0);
+ bookieConf.setAllowLoopback(true);
+ File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
+ "test");
+ if (!tmpdir.delete()) {
+ LOG.debug("Fail to delete tmpdir " + tmpdir);
+ }
+ if (!tmpdir.mkdir()) {
+ throw new IOException("Fail to create tmpdir " + tmpdir);
+ }
+ tmpDirs.add(tmpdir);
+
+ bookieConf.setZkServers(zkEnsemble);
+ bookieConf.setJournalDirName(tmpdir.getPath());
+ bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()});
+
+ BookieServer b = new BookieServer(bookieConf);
+ b.start();
+ for (int i = 0; i < 10 && !b.isRunning(); i++) {
+ Thread.sleep(10000);
+ }
+ if (!b.isRunning()) {
+ throw new IOException("Bookie would not start");
+ }
+ return b;
+ }
+
+ /**
+ * Check that a number of bookies are available
+ *
+ * @param count number of bookies required
+ * @param timeout number of seconds to wait for bookies to start
+ * @throws java.io.IOException if bookies are not started by the time the timeout hits
+ */
+ public int checkBookiesUp(int count, int timeout) throws Exception {
+ ZooKeeper zkc = connectZooKeeper(zkHost, zkPort, zkTimeoutSec);
+ try {
+ int mostRecentSize = 0;
+ for (int i = 0; i < timeout; i++) {
+ try {
+ List<String> children = zkc.getChildren("/ledgers/available",
+ false);
+ children.remove("readonly");
+ mostRecentSize = children.size();
+ if ((mostRecentSize > count) || LOG.isDebugEnabled()) {
+ LOG.info("Found " + mostRecentSize + " bookies up, "
+ + "waiting for " + count);
+ if ((mostRecentSize > count) || LOG.isTraceEnabled()) {
+ for (String child : children) {
+ LOG.info(" server: " + child);
+ }
+ }
+ }
+ if (mostRecentSize == count) {
+ break;
+ }
+ } catch (KeeperException e) {
+ // ignore
+ }
+ Thread.sleep(1000);
+ }
+ return mostRecentSize;
+ } finally {
+ zkc.close();
+ }
+ }
+
+ public static String getBkLedgerPath() {
+ return "/ledgers";
+ }
+
+ public static ZooKeeper connectZooKeeper(String zkHost, int zkPort)
+ throws IOException, KeeperException, InterruptedException {
+ return connectZooKeeper(zkHost, zkPort, DEFAULT_ZK_TIMEOUT_SEC);
+ }
+
+ public static ZooKeeper connectZooKeeper(String zkHost, int zkPort, int zkTimeoutSec)
+ throws IOException, KeeperException, InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final String zkHostPort = zkHost + ":" + zkPort;
+
+ ZooKeeper zkc = new ZooKeeper(zkHostPort, zkTimeoutSec * 1000, new Watcher() {
+ public void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.SyncConnected) {
+ latch.countDown();
+ }
+ }
+ });
+ if (!latch.await(zkTimeoutSec, TimeUnit.SECONDS)) {
+ throw new IOException("Zookeeper took too long to connect");
+ }
+ return zkc;
+ }
+
+ public static URI createDLMURI(String path) throws Exception {
+ return createDLMURI(DEFAULT_ZK_ENSEMBLE, path);
+ }
+
+ public static URI createDLMURI(String zkServers, String path) throws Exception {
+ return URI.create("distributedlog://" + zkServers + DLOG_NAMESPACE + path);
+ }
+
+ /**
+ * Try to start zookkeeper locally on any port.
+ */
+ public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(File zkDir) throws Exception {
+ return runZookeeperOnAnyPort((int) (Math.random()*10000+7000), zkDir);
+ }
+
+ /**
+ * Try to start zookkeeper locally on any port beginning with some base port.
+ * Dump some socket info when bind fails.
+ */
+ public static Pair<ZooKeeperServerShim, Integer> runZookeeperOnAnyPort(int basePort, File zkDir) throws Exception {
+
+ final int MAX_RETRIES = 20;
+ final int MIN_PORT = 1025;
+ final int MAX_PORT = 65535;
+ ZooKeeperServerShim zks = null;
+ int zkPort = basePort;
+ boolean success = false;
+ int retries = 0;
+
+ while (!success) {
+ try {
+ LOG.info("zk trying to bind to port " + zkPort);
+ zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkDir);
+ success = true;
+ } catch (BindException be) {
+ retries++;
+ if (retries > MAX_RETRIES) {
+ throw be;
+ }
+ zkPort++;
+ if (zkPort > MAX_PORT) {
+ zkPort = MIN_PORT;
+ }
+ }
+ }
+
+ return Pair.of(zks, zkPort);
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ if (args.length < 1) {
+ System.out.println("Usage: LocalDLEmulator <zk_port>");
+ System.exit(-1);
+ }
+
+ final int zkPort = Integer.parseInt(args[0]);
+ final File zkDir = IOUtils.createTempDir("distrlog", "zookeeper");
+ final LocalDLMEmulator localDlm = LocalDLMEmulator.newBuilder()
+ .zkPort(zkPort)
+ .build();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ localDlm.teardown();
+ FileUtils.deleteDirectory(zkDir);
+ System.out.println("ByeBye!");
+ } catch (Exception e) {
+ // do nothing
+ }
+ }
+ });
+ localDlm.start();
+
+ System.out.println(String.format(
+ "DistributedLog Sandbox is running now. You could access distributedlog://%s:%s",
+ DEFAULT_ZK_HOST,
+ zkPort));
+ } catch (Exception ex) {
+ System.out.println("Exception occurred running emulator " + ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
new file mode 100644
index 0000000..75a32ef
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import org.apache.distributedlog.io.AsyncCloseable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
+ *
+ * <h3>Lifecycle of a Reader</h3>
+ *
+ * A reader is a <i>sequential</i> reader that read records from a DL log starting
+ * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
+ * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
+ * <p>
+ * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
+ * to read records out the log from provided position.
+ * <p>
+ * Closing the reader (via {@link #close()} will release all the resources occupied
+ * by this reader instance.
+ * <p>
+ * Exceptions could be thrown during reading records. Once the exception is thrown,
+ * the reader is set to an error state and it isn't usable anymore. It is the application's
+ * responsibility to handle the exceptions and re-create readers if necessary.
+ * <p>
+ * Example:
+ * <pre>
+ * DistributedLogManager dlm = ...;
+ * long nextTxId = ...;
+ * LogReader reader = dlm.getInputStream(nextTxId);
+ *
+ * while (true) { // keep reading & processing records
+ * LogRecord record;
+ * try {
+ * record = reader.readNext(false);
+ * nextTxId = record.getTransactionId();
+ * // process the record
+ * ...
+ * } catch (IOException ioe) {
+ * // handle the exception
+ * ...
+ * reader = dlm.getInputStream(nextTxId + 1);
+ * }
+ * }
+ *
+ * </pre>
+ *
+ * <h3>Read Records</h3>
+ *
+ * Reading records from an <i>endless</i> log in `synchronous` way isn't as
+ * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
+ * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
+ * controlling the <i>waiting</i> behavior on `synchronous` reads.
+ *
+ * <h4>Blocking vs NonBlocking</h4>
+ *
+ * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
+ * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
+ * means the reads will only check readahead cache and return whatever records
+ * available in the readahead cache.
+ * <p>
+ * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
+ * catching up with writer (there are records in the log), the read call will
+ * wait until records are read and returned. If the reader is caught up with
+ * writer (there are no more records in the log at read time), the read call
+ * will wait for a small period of time (defined in
+ * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
+ * records available in the readahead cache. In other words, if a reader sees
+ * no record on blocking reads, it means the reader is `caught-up` with the
+ * writer.
+ * <p>
+ * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
+ * state machines. Applications could use <i>blocking</i> reads till caught up
+ * with latest data. Once they are caught up with latest data, they could start
+ * serving their service and turn to <i>non-blocking</i> read mode and tail read
+ * data from the logs.
+ * <p>
+ * See examples below.
+ *
+ * <h4>Read Single Record</h4>
+ *
+ * {@link #readNext(boolean)} is reading individual records from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ *
+ * // keep reading records in blocking way until no records available in the log
+ * LogRecord record = reader.readNext(false);
+ * while (null != record) {
+ * // process the record
+ * ...
+ * // read next record
+ * records = reader.readNext(false);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ * record = reader.readNext(true)
+ * // process the new records
+ * ...
+ * }
+ * </pre>
+ *
+ * <h4>Read Batch of Records</h4>
+ *
+ * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
+ * from a DL log.
+ *
+ * <pre>
+ * LogReader reader = ...
+ * int N = 10;
+ *
+ * // keep reading N records in blocking way until no records available in the log
+ * List<LogRecord> records = reader.readBulk(false, N);
+ * while (!records.isEmpty()) {
+ * // process the list of records
+ * ...
+ * if (records.size() < N) { // no more records available in the log
+ * break;
+ * }
+ * // read next N records
+ * records = reader.readBulk(false, N);
+ * }
+ *
+ * ...
+ *
+ * // reader is caught up with writer, doing non-blocking reads to tail the log
+ * while (true) {
+ * records = reader.readBulk(true, N)
+ * // process the new records
+ * ...
+ * }
+ *
+ * </pre>
+ *
+ * @see AsyncLogReader
+ *
+ * NOTE:
+ * 1. Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
+ * the {@link AsyncCloseable} interface so the reader could be closed asynchronously
+ */
+public interface LogReader extends Closeable, AsyncCloseable {
+
+ /**
+ * Read the next log record from the stream.
+ * <p>
+ * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
+ * records from read ahead cache. It would return <i>null</i> if there isn't any records
+ * available in the read ahead cache.
+ * <p>
+ * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
+ * block until return a record if there are records in the stream (aka catching up).
+ * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
+ * milliseconds and return null if there isn't any more records in the stream.
+ *
+ * @param nonBlocking should the read make blocking calls to the backend or rely on the
+ * readAhead cache
+ * @return an operation from the stream or null if at end of stream
+ * @throws IOException if there is an error reading from the stream
+ */
+ public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
+
+ /**
+ * Read the next <i>numLogRecords</i> log records from the stream
+ *
+ * @param nonBlocking should the read make blocking calls to the backend or rely on the
+ * readAhead cache
+ * @param numLogRecords maximum number of log records returned by this call.
+ * @return an operation from the stream or empty list if at end of stream
+ * @throws IOException if there is an error reading from the stream
+ * @see #readNext(boolean)
+ */
+ public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
+}